You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2021/02/21 01:13:40 UTC

[flink] branch master updated (c77a686 -> 86015c7)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c77a686  [FLINK-21187][runtime] Adds exception history
     add 2229300  [hotfix][core] Minor code clean-ups in HeapMemorySegment.
     add 0a664e1  [FLINK-21417][core] Migrate on-heap use cases of HybridMemorySegment to HeapMemorySegment.
     add 10df870  [FLINK-21417][core] Move some access methods from HybridMemorySegment to MemorySegment.
     new 046f950  [FLINK-21417][core] Turn HybridMemorySegment into OffHeapMemorySegment.
     new 94362fa  [FLINK-21417][core] Seprate OffHeapMemorySegment into DirectMemorySegment and UnsafeMemorySegment.
     new 1d0f5e5  [FLINK-21417][core] Re-abstract wrapping methods for memory segments.
     new 86015c7  [FLINK-21417][core] Update MemorySegmentFactory with explicit segment types.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../core/memory/ByteArrayInputStreamWithPos.java   |   4 +-
 .../flink/core/memory/DirectMemorySegment.java     |  52 +++
 .../flink/core/memory/HeapMemorySegment.java       |  91 +----
 .../flink/core/memory/HybridMemorySegment.java     | 404 ---------------------
 .../apache/flink/core/memory/MemorySegment.java    | 246 +++++++++++--
 .../flink/core/memory/MemorySegmentFactory.java    |  53 ++-
 .../flink/core/memory/OffHeapMemorySegment.java    |  88 +++++
 .../flink/core/memory/UnsafeMemorySegment.java     |  67 ++++
 .../api/common/typeutils/ComparatorTestBase.java   |   2 +-
 .../flink/core/memory/CrossSegmentTypeTest.java    |  12 +-
 .../core/memory/DataInputOutputSerializerTest.java |   2 +-
 ...gmentTest.java => DirectMemorySegmentTest.java} |  18 +-
 .../flink/core/memory/EndiannessAccessChecks.java  |  34 +-
 .../flink/core/memory/HeapMemorySegmentTest.java   |   4 +-
 .../core/memory/HybridOnHeapMemorySegmentTest.java | 101 ------
 .../flink/core/memory/MemorySegmentChecksTest.java |  46 +--
 .../core/memory/MemorySegmentFactoryTest.java      |  15 +-
 .../core/memory/MemorySegmentUndersizedTest.java   |  33 +-
 .../core/memory/OperationsOnFreedSegmentTest.java  |  28 +-
 ...gmentTest.java => UnsafeMemorySegmentTest.java} |  17 +-
 .../apache/flink/types/NormalizableKeyTest.java    |   2 +-
 .../runtime/io/disk/FileBasedBufferIterator.java   |   4 +-
 .../network/api/serialization/EventSerializer.java |   4 +-
 .../api/serialization/NonSpanningWrapper.java      |   2 +-
 .../network/api/serialization/SpanningWrapper.java |  14 +-
 .../io/network/buffer/BufferCompressor.java        |   3 +-
 .../io/network/buffer/BufferDecompressor.java      |   3 +-
 .../io/network/buffer/NetworkBufferPool.java       |   2 +-
 .../io/network/netty/NetworkBufferAllocator.java   |   2 +-
 .../network/partition/BufferReaderWriterUtil.java  |   2 +-
 .../network/partition/FileChannelBoundedData.java  |   3 +-
 .../network/partition/PartitionSortedBuffer.java   |   2 +-
 .../partition/SortMergeResultPartition.java        |   2 +-
 .../partition/SortMergeSubpartitionReader.java     |   3 +-
 .../partition/consumer/SingleInputGate.java        |   2 +-
 .../apache/flink/runtime/memory/MemoryManager.java |   8 +-
 .../channel/ChannelStateCheckpointWriterTest.java  |  10 +-
 .../channel/ChannelStateChunkReaderTest.java       |   2 +-
 .../channel/ChannelStateSerializerImplTest.java    |   9 +-
 ...ChannelStateWriteRequestDispatcherImplTest.java |   2 +-
 .../ChannelStateWriteRequestDispatcherTest.java    |   4 +-
 .../channel/ChannelStateWriterImplTest.java        |   4 +-
 .../SequentialChannelStateReaderImplTest.java      |   2 +-
 .../AsynchronousBufferFileWriterTest.java          |   2 +-
 .../iomanager/AsynchronousFileIOChannelTest.java   |   4 +-
 .../BufferFileWriterFileSegmentReaderTest.java     |   5 +-
 .../disk/iomanager/BufferFileWriterReaderTest.java |   2 +-
 .../io/disk/iomanager/IOManagerAsyncTest.java      |   4 +-
 .../network/api/serialization/PagedViewsTest.java  |   4 +-
 .../SpanningRecordSerializationTest.java           |   2 +-
 .../api/serialization/SpanningWrapperTest.java     |   8 +-
 .../AbstractCollectingResultPartitionWriter.java   |   2 +-
 .../buffer/BufferBuilderAndConsumerTest.java       |   2 +-
 .../io/network/buffer/BufferBuilderTestUtils.java  |  10 +-
 .../io/network/buffer/BufferCompressionTest.java   |   4 +-
 .../BufferConsumerWithPartialRecordLengthTest.java |   2 +-
 .../io/network/buffer/NetworkBufferTest.java       |   4 +-
 .../network/buffer/ReadOnlySlicedBufferTest.java   |   2 +-
 .../NettyMessageClientDecoderDelegateTest.java     |   2 +-
 .../NettyMessageClientSideSerializationTest.java   |   4 +-
 .../BoundedBlockingSubpartitionWriteReadTest.java  |   2 +-
 .../partition/BufferReaderWriterUtilTest.java      |   6 +-
 .../network/partition/InputChannelTestUtils.java   |   3 +-
 .../partition/PartitionSortedBufferTest.java       |  10 +-
 .../partition/PartitionedFileWriteReadTest.java    |  15 +-
 .../partition/PipelinedSubpartitionTest.java       |   2 +-
 .../partition/SortMergeResultPartitionTest.java    |   2 +-
 .../partition/consumer/SingleInputGateTest.java    |   2 +-
 .../runtime/io/network/util/TestBufferFactory.java |   4 +-
 .../io/network/util/TestSubpartitionProducer.java  |   3 +-
 .../runtime/memory/MemorySegmentSimpleTest.java    |   2 +-
 .../operators/hash/CompactingHashTableTest.java    |   2 +-
 .../hash/HashTablePerformanceComparison.java       |   2 +-
 .../runtime/operators/hash/HashTableTest.java      |   2 +-
 .../hash/InPlaceMutableHashTableTest.java          |   2 +-
 .../operators/hash/MutableHashTableTestBase.java   |   2 +-
 .../flink/runtime/operators/util/BitSetTest.java   |   2 +-
 .../runtime/operators/util/BloomFilterTest.java    |   2 +-
 .../runtime/state/ChannelPersistenceITCase.java    |   4 +-
 .../state/heap/CopyOnWriteSkipListStateMap.java    |   4 +-
 .../runtime/state/heap/SkipListKeySerializer.java  |   8 +-
 .../flink/runtime/state/heap/SkipListUtils.java    |   2 +-
 .../CopyOnWriteSkipListStateMapBasicOpTest.java    |   4 +-
 .../state/heap/SkipListKeyComparatorTest.java      |  12 +-
 .../runtime/state/heap/SkipListSerializerTest.java |   4 +-
 .../runtime/state/heap/SkipListUtilsTest.java      |  10 +-
 .../flink/runtime/state/heap/TestAllocator.java    |   2 +-
 .../CheckpointBarrierTrackerTest.java              |   2 +-
 .../flink/table/data/binary/BinaryArrayData.java   |   4 +-
 .../flink/table/data/binary/BinaryMapData.java     |   4 +-
 .../table/data/binary/BinaryRawValueData.java      |   8 +-
 .../flink/table/data/binary/BinaryRowData.java     |   2 +-
 .../flink/table/data/binary/BinaryStringData.java  |   8 +-
 .../flink/table/data/binary/NestedRowData.java     |   2 +-
 .../planner/codegen/SortCodeGeneratorTest.java     |   2 +-
 .../flink/table/data/binary/BinaryRowDataUtil.java |   2 +-
 .../table/data/writer/AbstractBinaryWriter.java    |   4 +-
 .../flink/table/data/writer/BinaryArrayWriter.java |   2 +-
 .../flink/table/data/writer/BinaryRowWriter.java   |   2 +-
 .../raw/RawFormatDeserializationSchema.java        |   3 +-
 .../formats/raw/RawFormatSerializationSchema.java  |  11 +-
 .../runtime/hashtable/BaseHybridHashTable.java     |   5 +-
 .../runtime/hashtable/BinaryHashBucketArea.java    |   3 +-
 .../table/runtime/hashtable/LongHashPartition.java |   2 +-
 .../runtime/io/CompressedBlockChannelReader.java   |   3 +-
 .../runtime/io/CompressedBlockChannelWriter.java   |   2 +-
 ...CompressedHeaderlessChannelReaderInputView.java |   4 +-
 ...ompressedHeaderlessChannelWriterOutputView.java |   4 +-
 .../BytesHashMapSpillMemorySegmentPool.java        |   2 +-
 .../runtime/typeutils/ArrayDataSerializer.java     |   2 +-
 .../runtime/typeutils/BinaryRowDataSerializer.java |   4 +-
 .../table/runtime/typeutils/MapDataSerializer.java |   2 +-
 .../runtime/typeutils/RawValueDataSerializer.java  |   4 +-
 .../flink/table/runtime/util/FileChannelUtil.java  |   8 +-
 .../runtime/util/ResettableExternalBuffer.java     |   2 +-
 .../collections/binary/AbstractBytesHashMap.java   |   2 +-
 .../flink/table/data/BinaryArrayDataTest.java      |   4 +-
 .../apache/flink/table/data/BinaryRowDataTest.java |  25 +-
 .../flink/table/data/BinaryStringDataTest.java     |  28 +-
 .../table/data/binary/BinarySegmentUtilsTest.java  |  60 +--
 .../flink/table/data/util/DataFormatTestUtil.java  |   9 +-
 .../table/runtime/operators/sort/SortUtilTest.java |   4 +-
 .../operators/sort/TestMemorySegmentPool.java      |   2 +-
 .../collections/binary/BytesHashMapTestBase.java   |   7 +-
 .../manual/HashTableRecordWidthCombinations.java   |   2 +-
 .../CaseClassNormalizedKeySortingTest.scala        |   2 +-
 126 files changed, 814 insertions(+), 1020 deletions(-)
 create mode 100644 flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
 delete mode 100644 flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
 create mode 100644 flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
 create mode 100644 flink-core/src/main/java/org/apache/flink/core/memory/UnsafeMemorySegment.java
 rename flink-core/src/test/java/org/apache/flink/core/memory/{HybridOffHeapDirectMemorySegmentTest.java => DirectMemorySegmentTest.java} (76%)
 delete mode 100644 flink-core/src/test/java/org/apache/flink/core/memory/HybridOnHeapMemorySegmentTest.java
 rename flink-core/src/test/java/org/apache/flink/core/memory/{HybridOffHeapUnsafeMemorySegmentTest.java => UnsafeMemorySegmentTest.java} (70%)


[flink] 02/04: [FLINK-21417][core] Seprate OffHeapMemorySegment into DirectMemorySegment and UnsafeMemorySegment.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 94362fa8917618ea3fa2a46b072abe528034070d
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Feb 19 16:20:03 2021 +0800

    [FLINK-21417][core] Seprate OffHeapMemorySegment into DirectMemorySegment and UnsafeMemorySegment.
---
 .../flink/core/memory/DirectMemorySegment.java     | 57 ++++++++++++++++++
 .../flink/core/memory/MemorySegmentFactory.java    |  6 +-
 .../flink/core/memory/OffHeapMemorySegment.java    | 51 +---------------
 .../flink/core/memory/UnsafeMemorySegment.java     | 67 ++++++++++++++++++++++
 .../flink/core/memory/CrossSegmentTypeTest.java    |  4 +-
 ...gmentTest.java => DirectMemorySegmentTest.java} |  8 +--
 .../flink/core/memory/MemorySegmentChecksTest.java | 18 ++++--
 .../core/memory/MemorySegmentUndersizedTest.java   | 12 ++--
 .../core/memory/OperationsOnFreedSegmentTest.java  | 22 +++----
 ...gmentTest.java => UnsafeMemorySegmentTest.java} |  6 +-
 10 files changed, 170 insertions(+), 81 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
new file mode 100644
index 0000000..b878f18
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class represents a piece of direct off-heap memory managed by Flink.
+ *
+ * <p>Note that memory segments should usually not be allocated manually, but rather through the
+ * {@link MemorySegmentFactory}.
+ */
+@Internal
+public final class DirectMemorySegment extends OffHeapMemorySegment {
+
+    /**
+     * Creates a new memory segment that represents the memory backing the given direct byte buffer.
+     * Note that the given ByteBuffer must be direct {@link
+     * java.nio.ByteBuffer#allocateDirect(int)}, otherwise this method with throw an
+     * IllegalArgumentException.
+     *
+     * <p>The memory segment references the given owner.
+     *
+     * @param buffer The byte buffer whose memory is represented by this memory segment.
+     * @param owner The owner references by this memory segment.
+     * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
+     */
+    DirectMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner) {
+        super(buffer, owner);
+    }
+
+    @Override
+    public ByteBuffer wrap(int offset, int length) {
+        return wrapInternal(offset, length);
+    }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index 098f34c..b60c836 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -130,7 +130,7 @@ public final class MemorySegmentFactory {
      */
     public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) {
         ByteBuffer memory = allocateDirectMemory(size);
-        return new OffHeapMemorySegment(memory, owner);
+        return new DirectMemorySegment(memory, owner);
     }
 
     @VisibleForTesting
@@ -177,7 +177,7 @@ public final class MemorySegmentFactory {
         ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
         Runnable cleaner =
                 MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, customCleanupAction);
-        return new OffHeapMemorySegment(offHeapBuffer, owner, false, cleaner);
+        return new UnsafeMemorySegment(offHeapBuffer, owner, cleaner);
     }
 
     /**
@@ -192,6 +192,6 @@ public final class MemorySegmentFactory {
      * @return A new memory segment representing the given off-heap memory.
      */
     public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
-        return new OffHeapMemorySegment(memory, null);
+        return new DirectMemorySegment(memory, null);
     }
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
index d41c93d..954e2cb 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
@@ -39,21 +39,12 @@ import static org.apache.flink.core.memory.MemoryUtils.getByteBufferAddress;
  * {@link MemorySegmentFactory}.
  */
 @Internal
-public final class OffHeapMemorySegment extends MemorySegment {
+public abstract class OffHeapMemorySegment extends MemorySegment {
     /**
      * The direct byte buffer that wraps the off-heap memory. This memory segment holds a reference
      * to that buffer, so as long as this memory segment lives, the memory will not be released.
      */
-    @Nullable private ByteBuffer offHeapBuffer;
-
-    @Nullable private final Runnable cleaner;
-
-    /**
-     * Wrapping is not allowed when the underlying memory is unsafe. Unsafe memory can be actively
-     * released, without reference counting. Therefore, access from wrapped buffers, which may not
-     * be aware of the releasing of memory, could be risky.
-     */
-    private final boolean allowWrap;
+    private ByteBuffer offHeapBuffer;
 
     /**
      * Creates a new memory segment that represents the memory backing the given direct byte buffer.
@@ -68,32 +59,8 @@ public final class OffHeapMemorySegment extends MemorySegment {
      * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
      */
     OffHeapMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner) {
-        this(buffer, owner, true, null);
-    }
-
-    /**
-     * Creates a new memory segment that represents the memory backing the given direct byte buffer.
-     * Note that the given ByteBuffer must be direct {@link
-     * java.nio.ByteBuffer#allocateDirect(int)}, otherwise this method with throw an
-     * IllegalArgumentException.
-     *
-     * <p>The memory segment references the given owner.
-     *
-     * @param buffer The byte buffer whose memory is represented by this memory segment.
-     * @param owner The owner references by this memory segment.
-     * @param allowWrap Whether wrapping {@link ByteBuffer}s from the segment is allowed.
-     * @param cleaner The cleaner to be called on free segment.
-     * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
-     */
-    OffHeapMemorySegment(
-            @Nonnull ByteBuffer buffer,
-            @Nullable Object owner,
-            boolean allowWrap,
-            @Nullable Runnable cleaner) {
         super(getByteBufferAddress(buffer), buffer.capacity(), owner);
         this.offHeapBuffer = buffer;
-        this.allowWrap = allowWrap;
-        this.cleaner = cleaner;
     }
 
     // -------------------------------------------------------------------------
@@ -103,22 +70,10 @@ public final class OffHeapMemorySegment extends MemorySegment {
     @Override
     public void free() {
         super.free();
-        if (cleaner != null) {
-            cleaner.run();
-        }
         offHeapBuffer = null; // to enable GC of unsafe memory
     }
 
-    @Override
-    public ByteBuffer wrap(int offset, int length) {
-        if (!allowWrap) {
-            throw new UnsupportedOperationException(
-                    "Wrap is not supported by this segment. This usually indicates that the underlying memory is unsafe, thus transferring of ownership is not allowed.");
-        }
-        return wrapInternal(offset, length);
-    }
-
-    private ByteBuffer wrapInternal(int offset, int length) {
+    protected ByteBuffer wrapInternal(int offset, int length) {
         if (!isFreed()) {
             try {
                 ByteBuffer wrapper = offHeapBuffer.duplicate();
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/UnsafeMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/UnsafeMemorySegment.java
new file mode 100644
index 0000000..f075991
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/UnsafeMemorySegment.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class represents a piece of unsafe off-heap memory managed by Flink.
+ *
+ * <p>Note that memory segments should usually not be allocated manually, but rather through the
+ * {@link MemorySegmentFactory}.
+ */
+@Internal
+public class UnsafeMemorySegment extends OffHeapMemorySegment {
+    private final Runnable cleaner;
+
+    /**
+     * Creates a new memory segment that represents the memory backing the given unsafe byte buffer.
+     * Note that the given ByteBuffer must be direct {@link
+     * java.nio.ByteBuffer#allocateDirect(int)}, otherwise this method with throw an
+     * IllegalArgumentException.
+     *
+     * <p>The memory segment references the given owner.
+     *
+     * @param buffer The byte buffer whose memory is represented by this memory segment.
+     * @param owner The owner references by this memory segment.
+     * @param cleaner The cleaner to be called on free segment.
+     * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
+     */
+    UnsafeMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner, Runnable cleaner) {
+        super(buffer, owner);
+        this.cleaner = Preconditions.checkNotNull(cleaner);
+    }
+
+    @Override
+    public void free() {
+        super.free();
+        cleaner.run();
+    }
+
+    @Override
+    public ByteBuffer wrap(int offset, int length) {
+        throw new UnsupportedOperationException("Wrap is not supported by unsafe memory segment.");
+    }
+}
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
index 913f119..a49b802 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
@@ -29,8 +29,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Verifies interoperability between {@link HeapMemorySegment} and {@link OffHeapMemorySegment} (in
- * both heap and off-heap modes).
+ * Verifies interoperability between {@link HeapMemorySegment}, {@link DirectMemorySegment} and
+ * {@link UnsafeMemorySegment}.
  */
 public class CrossSegmentTypeTest {
 
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/DirectMemorySegmentTest.java
similarity index 88%
rename from flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java
rename to flink-core/src/test/java/org/apache/flink/core/memory/DirectMemorySegmentTest.java
index 0693955..446b896 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/DirectMemorySegmentTest.java
@@ -30,11 +30,11 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-/** Tests for the {@link OffHeapMemorySegment} in off-heap mode using direct memory. */
+/** Tests for the {@link DirectMemorySegment}. */
 @RunWith(Parameterized.class)
-public class HybridOffHeapDirectMemorySegmentTest extends MemorySegmentTestBase {
+public class DirectMemorySegmentTest extends MemorySegmentTestBase {
 
-    public HybridOffHeapDirectMemorySegmentTest(int pageSize) {
+    public DirectMemorySegmentTest(int pageSize) {
         super(pageSize);
     }
 
@@ -49,7 +49,7 @@ public class HybridOffHeapDirectMemorySegmentTest extends MemorySegmentTestBase
     }
 
     @Test
-    public void testHybridHeapSegmentSpecifics() {
+    public void testDirectSegmentSpecifics() {
         final int bufSize = 411;
         OffHeapMemorySegment seg = (OffHeapMemorySegment) createSegment(bufSize);
 
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
index d46e74f..df852fe 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
@@ -40,13 +40,23 @@ public class MemorySegmentChecksTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void testHybridOffHeapNullBuffer2() {
-        new OffHeapMemorySegment((ByteBuffer) null, new Object());
+    public void testDirectNullBuffer() {
+        new DirectMemorySegment((ByteBuffer) null, new Object());
     }
 
     @Test(expected = IllegalArgumentException.class)
-    public void testHybridNonDirectBuffer() {
-        new OffHeapMemorySegment(ByteBuffer.allocate(1024), new Object());
+    public void testDirectNonDirectBuffer() {
+        new DirectMemorySegment(ByteBuffer.allocate(1024), new Object());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testUnsafeNullBuffer() {
+        new UnsafeMemorySegment((ByteBuffer) null, new Object(), () -> {});
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testUnsafeNonDirectBuffer() {
+        new UnsafeMemorySegment(ByteBuffer.allocate(1024), new Object(), () -> {});
     }
 
     @Test(expected = IllegalArgumentException.class)
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
index ace47fb..93db20b 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
@@ -32,8 +32,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for undersized {@link HeapMemorySegment} and {@link OffHeapMemorySegment} (in both heap and
- * off-heap modes).
+ * Tests for undersized {@link HeapMemorySegment}, {@link DirectMemorySegment} and * {@link
+ * UnsafeMemorySegment}.
  */
 public class MemorySegmentUndersizedTest {
 
@@ -46,7 +46,7 @@ public class MemorySegmentUndersizedTest {
     }
 
     @Test
-    public void testZeroSizeOffHeapHybridSegment() {
+    public void testZeroSizeDirectSegment() {
         MemorySegment segment = MemorySegmentFactory.allocateUnpooledOffHeapMemory(0);
 
         testZeroSizeBuffer(segment);
@@ -54,7 +54,7 @@ public class MemorySegmentUndersizedTest {
     }
 
     @Test
-    public void testZeroSizeOffHeapUnsafeHybridSegment() {
+    public void testZeroSizeUnsafeSegment() {
         MemorySegment segment = MemorySegmentFactory.allocateOffHeapUnsafeMemory(0);
 
         testZeroSizeBuffer(segment);
@@ -67,12 +67,12 @@ public class MemorySegmentUndersizedTest {
     }
 
     @Test
-    public void testSizeOneOffHeapHybridSegment() {
+    public void testSizeOneDirectSegment() {
         testSegmentWithSizeLargerZero(MemorySegmentFactory.allocateUnpooledOffHeapMemory(1));
     }
 
     @Test
-    public void testSizeOneOffHeapUnsafeHybridSegment() {
+    public void testSizeOneUnsafeSegment() {
         testSegmentWithSizeLargerZero(MemorySegmentFactory.allocateOffHeapUnsafeMemory(1));
     }
 
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
index cb7645c..63f7468 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
@@ -50,17 +50,18 @@ public class OperationsOnFreedSegmentTest {
     @Test
     public void testCompare() {
         MemorySegment aliveHeap = new HeapMemorySegment(new byte[PAGE_SIZE]);
-        MemorySegment aliveHybridOffHeap =
-                MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
+        MemorySegment aliveDirect = MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
+        MemorySegment aliveUnsafe = MemorySegmentFactory.allocateOffHeapUnsafeMemory(PAGE_SIZE);
 
         MemorySegment freedHeap = new HeapMemorySegment(new byte[PAGE_SIZE]);
-        MemorySegment freedHybridOffHeap =
-                MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
+        MemorySegment freedDirect = MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
+        MemorySegment freedUnsafe = MemorySegmentFactory.allocateOffHeapUnsafeMemory(PAGE_SIZE);
         freedHeap.free();
-        freedHybridOffHeap.free();
+        freedDirect.free();
+        freedUnsafe.free();
 
-        MemorySegment[] alive = {aliveHeap, aliveHybridOffHeap};
-        MemorySegment[] free = {freedHeap, freedHybridOffHeap};
+        MemorySegment[] alive = {aliveHeap, aliveDirect, aliveUnsafe};
+        MemorySegment[] free = {freedHeap, freedDirect, freedUnsafe};
 
         // alive with free
         for (MemorySegment seg1 : alive) {
@@ -125,11 +126,10 @@ public class OperationsOnFreedSegmentTest {
 
     private static MemorySegment[] createTestSegments() {
         MemorySegment heap = new HeapMemorySegment(new byte[PAGE_SIZE]);
-        MemorySegment hybridOffHeap = MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
-        MemorySegment hybridOffHeapUnsafe =
-                MemorySegmentFactory.allocateOffHeapUnsafeMemory(PAGE_SIZE);
+        MemorySegment direct = MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
+        MemorySegment unsafe = MemorySegmentFactory.allocateOffHeapUnsafeMemory(PAGE_SIZE);
 
-        MemorySegment[] segments = {heap, hybridOffHeap, hybridOffHeapUnsafe};
+        MemorySegment[] segments = {heap, direct, unsafe};
 
         return segments;
     }
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/UnsafeMemorySegmentTest.java
similarity index 89%
rename from flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java
rename to flink-core/src/test/java/org/apache/flink/core/memory/UnsafeMemorySegmentTest.java
index 70cec1c4..9d02287 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/UnsafeMemorySegmentTest.java
@@ -26,11 +26,11 @@ import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertTrue;
 
-/** Tests for the {@link OffHeapMemorySegment} in off-heap mode using unsafe memory. */
+/** Tests for the {@link UnsafeMemorySegment}. */
 @RunWith(Parameterized.class)
-public class HybridOffHeapUnsafeMemorySegmentTest extends MemorySegmentTestBase {
+public class UnsafeMemorySegmentTest extends MemorySegmentTestBase {
 
-    public HybridOffHeapUnsafeMemorySegmentTest(int pageSize) {
+    public UnsafeMemorySegmentTest(int pageSize) {
         super(pageSize);
     }
 


[flink] 03/04: [FLINK-21417][core] Re-abstract wrapping methods for memory segments.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d0f5e5bc4e4926554a2f381b96ee4d569de8af5
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Feb 19 16:31:38 2021 +0800

    [FLINK-21417][core] Re-abstract wrapping methods for memory segments.
---
 .../flink/core/memory/DirectMemorySegment.java     |  5 -----
 .../flink/core/memory/HeapMemorySegment.java       | 18 +++---------------
 .../apache/flink/core/memory/MemorySegment.java    | 22 +++++++++++++++++++---
 .../flink/core/memory/OffHeapMemorySegment.java    | 14 +-------------
 .../flink/core/memory/MemorySegmentChecksTest.java | 15 +++++----------
 5 files changed, 28 insertions(+), 46 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
index b878f18..068887d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DirectMemorySegment.java
@@ -49,9 +49,4 @@ public final class DirectMemorySegment extends OffHeapMemorySegment {
     DirectMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner) {
         super(buffer, owner);
     }
-
-    @Override
-    public ByteBuffer wrap(int offset, int length) {
-        return wrapInternal(offset, length);
-    }
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
index 37787c8..808a5cd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
@@ -25,8 +25,6 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 /**
  * This class represents a piece of heap memory managed by Flink. The segment is backed by a byte
@@ -81,10 +79,10 @@ public final class HeapMemorySegment extends MemorySegment {
     }
 
     @Override
-    public ByteBuffer wrap(int offset, int length) {
-        try {
+    protected ByteBuffer wrapInternal(int offset, int length) {
+        if (!isFreed()) {
             return ByteBuffer.wrap(this.memory, offset, length);
-        } catch (NullPointerException e) {
+        } else {
             throw new IllegalStateException("segment has been freed");
         }
     }
@@ -151,16 +149,6 @@ public final class HeapMemorySegment extends MemorySegment {
         source.get(this.memory, offset, numBytes);
     }
 
-    @Override
-    public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
-        throw new UnsupportedOperationException("Unsupported because not needed atm.");
-    }
-
-    @Override
-    public void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {
-        throw new UnsupportedOperationException("Unsupported because not needed atm.");
-    }
-
     // -------------------------------------------------------------------------
     //                             Factoring
     // -------------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
index c34357b..d726e22 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@@ -19,6 +19,7 @@
 package org.apache.flink.core.memory;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -277,7 +278,18 @@ public abstract class MemorySegment {
      * @throws IndexOutOfBoundsException Thrown, if offset is negative or larger than the memory
      *     segment size, or if the offset plus the length is larger than the segment size.
      */
-    public abstract ByteBuffer wrap(int offset, int length);
+    public ByteBuffer wrap(int offset, int length) {
+        return wrapInternal(offset, length);
+    }
+
+    /**
+     * This is an internal interface for wrapping the chunk of the underlying memory located between
+     * <tt>offset</tt> and <tt>offset + * length</tt> in a NIO ByteBuffer, without transferring the
+     * ownership of the memory.
+     *
+     * @see #wrap(int, int)
+     */
+    protected abstract ByteBuffer wrapInternal(int offset, int length);
 
     /**
      * Gets the owner of this memory segment. Returns null, if the owner was not set.
@@ -1576,7 +1588,9 @@ public abstract class MemorySegment {
      * @param processFunction to be applied to the segment as {@link ByteBuffer}.
      * @return the value that the process function returns.
      */
-    public abstract <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction);
+    public final <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
+        return Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size()));
+    }
 
     /**
      * Supplies a {@link ByteBuffer} that represents this entire segment to the given process
@@ -1588,5 +1602,7 @@ public abstract class MemorySegment {
      *
      * @param processConsumer to accept the segment as {@link ByteBuffer}.
      */
-    public abstract void processAsByteBuffer(Consumer<ByteBuffer> processConsumer);
+    public final void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {
+        Preconditions.checkNotNull(processConsumer).accept(wrapInternal(0, size()));
+    }
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
index 954e2cb..b5e41d3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
@@ -19,14 +19,11 @@
 package org.apache.flink.core.memory;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.nio.ByteBuffer;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 import static org.apache.flink.core.memory.MemoryUtils.getByteBufferAddress;
 
@@ -73,6 +70,7 @@ public abstract class OffHeapMemorySegment extends MemorySegment {
         offHeapBuffer = null; // to enable GC of unsafe memory
     }
 
+    @Override
     protected ByteBuffer wrapInternal(int offset, int length) {
         if (!isFreed()) {
             try {
@@ -87,14 +85,4 @@ public abstract class OffHeapMemorySegment extends MemorySegment {
             throw new IllegalStateException("segment has been freed");
         }
     }
-
-    @Override
-    public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
-        return Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size()));
-    }
-
-    @Override
-    public void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {
-        Preconditions.checkNotNull(processConsumer).accept(wrapInternal(0, size()));
-    }
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
index df852fe..7d086e3 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
@@ -23,8 +23,6 @@ import org.junit.Test;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.nio.ByteBuffer;
-import java.util.function.Consumer;
-import java.util.function.Function;
 
 /** Tests for the sanity checks of the memory segments. */
 public class MemorySegmentChecksTest {
@@ -88,6 +86,11 @@ public class MemorySegmentChecksTest {
         }
 
         @Override
+        protected ByteBuffer wrapInternal(int offset, int length) {
+            return null;
+        }
+
+        @Override
         public byte get(int index) {
             return 0;
         }
@@ -112,13 +115,5 @@ public class MemorySegmentChecksTest {
 
         @Override
         public void put(int offset, ByteBuffer source, int numBytes) {}
-
-        @Override
-        public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
-            return null;
-        }
-
-        @Override
-        public void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {}
     }
 }


[flink] 01/04: [FLINK-21417][core] Turn HybridMemorySegment into OffHeapMemorySegment.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 046f950a820b35fde6f411fb90f42ab1b5eaaeff
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Feb 19 15:46:54 2021 +0800

    [FLINK-21417][core] Turn HybridMemorySegment into OffHeapMemorySegment.
    
    All on-heap specific logics are moved out.
---
 .../apache/flink/core/memory/MemorySegment.java    |  18 ++--
 .../flink/core/memory/MemorySegmentFactory.java    |   8 +-
 ...emorySegment.java => OffHeapMemorySegment.java} |  90 +++---------------
 .../flink/core/memory/CrossSegmentTypeTest.java    |   2 +-
 .../flink/core/memory/EndiannessAccessChecks.java  |  14 +--
 .../HybridOffHeapDirectMemorySegmentTest.java      |   4 +-
 .../HybridOffHeapUnsafeMemorySegmentTest.java      |   2 +-
 .../core/memory/HybridOnHeapMemorySegmentTest.java | 101 ---------------------
 .../flink/core/memory/MemorySegmentChecksTest.java |   9 +-
 .../core/memory/MemorySegmentFactoryTest.java      |   4 +-
 .../core/memory/MemorySegmentUndersizedTest.java   |  15 +--
 .../core/memory/OperationsOnFreedSegmentTest.java  |  12 +--
 .../apache/flink/runtime/memory/MemoryManager.java |   4 +-
 13 files changed, 49 insertions(+), 234 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
index 997396d..c34357b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@@ -38,10 +38,9 @@ import static org.apache.flink.core.memory.MemoryUtils.getByteBufferAddress;
  * memory (byte array) or by off-heap memory.
  *
  * <p>The methods for individual memory access are specialized in the classes {@link
- * org.apache.flink.core.memory.HeapMemorySegment} and {@link
- * org.apache.flink.core.memory.HybridMemorySegment}. All methods that operate across two memory
- * segments are implemented in this class, to transparently handle the mixing of memory segment
- * types.
+ * org.apache.flink.core.memory.HeapMemorySegment} and {@link OffHeapMemorySegment}. All methods
+ * that operate across two memory segments are implemented in this class, to transparently handle
+ * the mixing of memory segment types.
  *
  * <p>This class fulfills conceptually a similar purpose as Java's {@link java.nio.ByteBuffer}. We
  * add this specialized class for various reasons:
@@ -91,8 +90,7 @@ import static org.apache.flink.core.memory.MemoryUtils.getByteBufferAddress;
  * <p><i>Note on efficiency</i>: For best efficiency, the code that uses this class should make sure
  * that only one subclass is loaded, or that the methods that are abstract in this class are used
  * only from one of the subclasses (either the {@link
- * org.apache.flink.core.memory.HeapMemorySegment}, or the {@link
- * org.apache.flink.core.memory.HybridMemorySegment}).
+ * org.apache.flink.core.memory.HeapMemorySegment}, or the {@link OffHeapMemorySegment}).
  *
  * <p>That way, all the abstract methods in the MemorySegment base class have only one loaded actual
  * implementation. This is easy for the JIT to recognize through class hierarchy analysis, or by
@@ -127,22 +125,22 @@ public abstract class MemorySegment {
      * segment will point to undefined addresses outside the heap and may in out-of-order execution
      * cases cause segmentation faults.
      */
-    protected final byte[] heapMemory;
+    private final byte[] heapMemory;
 
     /**
      * The address to the data, relative to the heap memory byte array. If the heap memory byte
      * array is <tt>null</tt>, this becomes an absolute memory address outside the heap.
      */
-    protected long address;
+    private long address;
 
     /**
      * The address one byte after the last addressable byte, i.e. <tt>address + size</tt> while the
      * segment is not disposed.
      */
-    protected final long addressLimit;
+    private final long addressLimit;
 
     /** The size in bytes of the memory segment. */
-    protected final int size;
+    private final int size;
 
     /** Optional owner of the memory segment. */
     private final Object owner;
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index 05a2e65..098f34c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -31,7 +31,7 @@ import java.nio.ByteBuffer;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
- * A factory for (hybrid) memory segments ({@link HybridMemorySegment}).
+ * A factory for (hybrid) memory segments ({@link OffHeapMemorySegment}).
  *
  * <p>The purpose of this factory is to make sure that all memory segments for heap data are of the
  * same type. That way, the runtime does not mix the various specializations of the {@link
@@ -130,7 +130,7 @@ public final class MemorySegmentFactory {
      */
     public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) {
         ByteBuffer memory = allocateDirectMemory(size);
-        return new HybridMemorySegment(memory, owner);
+        return new OffHeapMemorySegment(memory, owner);
     }
 
     @VisibleForTesting
@@ -177,7 +177,7 @@ public final class MemorySegmentFactory {
         ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
         Runnable cleaner =
                 MemoryUtils.createMemoryGcCleaner(offHeapBuffer, address, customCleanupAction);
-        return new HybridMemorySegment(offHeapBuffer, owner, false, cleaner);
+        return new OffHeapMemorySegment(offHeapBuffer, owner, false, cleaner);
     }
 
     /**
@@ -192,6 +192,6 @@ public final class MemorySegmentFactory {
      * @return A new memory segment representing the given off-heap memory.
      */
     public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
-        return new HybridMemorySegment(memory, null);
+        return new OffHeapMemorySegment(memory, null);
     }
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
similarity index 63%
rename from flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
rename to flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
index ccf8fd5..d41c93d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/OffHeapMemorySegment.java
@@ -24,9 +24,6 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -34,23 +31,15 @@ import java.util.function.Function;
 import static org.apache.flink.core.memory.MemoryUtils.getByteBufferAddress;
 
 /**
- * This class represents a piece of memory managed by Flink.
+ * This class represents a piece of off-heap memory managed by Flink.
  *
- * <p>The memory can be on-heap, off-heap direct or off-heap unsafe, this is transparently handled
- * by this class.
- *
- * <p>This class specializes byte access and byte copy calls for heap memory, while reusing the
- * multi-byte type accesses and cross-segment operations from the MemorySegment.
- *
- * <p>This class subsumes the functionality of the {@link
- * org.apache.flink.core.memory.HeapMemorySegment}, but is a bit less efficient for operations on
- * individual bytes.
+ * <p>The memory can be direct or unsafe, this is transparently handled by this class.
  *
  * <p>Note that memory segments should usually not be allocated manually, but rather through the
  * {@link MemorySegmentFactory}.
  */
 @Internal
-public final class HybridMemorySegment extends MemorySegment {
+public final class OffHeapMemorySegment extends MemorySegment {
     /**
      * The direct byte buffer that wraps the off-heap memory. This memory segment holds a reference
      * to that buffer, so as long as this memory segment lives, the memory will not be released.
@@ -78,7 +67,7 @@ public final class HybridMemorySegment extends MemorySegment {
      * @param owner The owner references by this memory segment.
      * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
      */
-    HybridMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner) {
+    OffHeapMemorySegment(@Nonnull ByteBuffer buffer, @Nullable Object owner) {
         this(buffer, owner, true, null);
     }
 
@@ -96,7 +85,7 @@ public final class HybridMemorySegment extends MemorySegment {
      * @param cleaner The cleaner to be called on free segment.
      * @throws IllegalArgumentException Thrown, if the given ByteBuffer is not direct.
      */
-    HybridMemorySegment(
+    OffHeapMemorySegment(
             @Nonnull ByteBuffer buffer,
             @Nullable Object owner,
             boolean allowWrap,
@@ -107,21 +96,6 @@ public final class HybridMemorySegment extends MemorySegment {
         this.cleaner = cleaner;
     }
 
-    /**
-     * Creates a new memory segment that represents the memory of the byte array.
-     *
-     * <p>The memory segment references the given owner.
-     *
-     * @param buffer The byte array whose memory is represented by this memory segment.
-     * @param owner The owner references by this memory segment.
-     */
-    HybridMemorySegment(byte[] buffer, Object owner) {
-        super(buffer, owner);
-        this.offHeapBuffer = null;
-        this.allowWrap = true;
-        this.cleaner = null;
-    }
-
     // -------------------------------------------------------------------------
     //  MemorySegment operations
     // -------------------------------------------------------------------------
@@ -145,61 +119,27 @@ public final class HybridMemorySegment extends MemorySegment {
     }
 
     private ByteBuffer wrapInternal(int offset, int length) {
-        if (address <= addressLimit) {
-            if (heapMemory != null) {
-                return ByteBuffer.wrap(heapMemory, offset, length);
-            } else {
-                try {
-                    ByteBuffer wrapper = offHeapBuffer.duplicate();
-                    wrapper.limit(offset + length);
-                    wrapper.position(offset);
-                    return wrapper;
-                } catch (IllegalArgumentException e) {
-                    throw new IndexOutOfBoundsException();
-                }
+        if (!isFreed()) {
+            try {
+                ByteBuffer wrapper = offHeapBuffer.duplicate();
+                wrapper.limit(offset + length);
+                wrapper.position(offset);
+                return wrapper;
+            } catch (IllegalArgumentException e) {
+                throw new IndexOutOfBoundsException();
             }
         } else {
             throw new IllegalStateException("segment has been freed");
         }
     }
 
-    // -------------------------------------------------------------------------
-    //  Bulk Read and Write Methods
-    // -------------------------------------------------------------------------
-
-    @Override
-    public final void get(DataOutput out, int offset, int length) throws IOException {
-        if (heapMemory != null) {
-            if (address <= addressLimit) {
-                out.write(heapMemory, offset, length);
-            } else {
-                throw new IllegalStateException("segment has been freed");
-            }
-        } else {
-            super.get(out, offset, length);
-        }
-    }
-
-    @Override
-    public final void put(DataInput in, int offset, int length) throws IOException {
-        if (heapMemory != null) {
-            if (address <= addressLimit) {
-                in.readFully(heapMemory, offset, length);
-            } else {
-                throw new IllegalStateException("segment has been freed");
-            }
-        } else {
-            super.put(in, offset, length);
-        }
-    }
-
     @Override
     public <T> T processAsByteBuffer(Function<ByteBuffer, T> processFunction) {
-        return Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size));
+        return Preconditions.checkNotNull(processFunction).apply(wrapInternal(0, size()));
     }
 
     @Override
     public void processAsByteBuffer(Consumer<ByteBuffer> processConsumer) {
-        Preconditions.checkNotNull(processConsumer).accept(wrapInternal(0, size));
+        Preconditions.checkNotNull(processConsumer).accept(wrapInternal(0, size()));
     }
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
index 37858f0..913f119 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
@@ -29,7 +29,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Verifies interoperability between {@link HeapMemorySegment} and {@link HybridMemorySegment} (in
+ * Verifies interoperability between {@link HeapMemorySegment} and {@link OffHeapMemorySegment} (in
  * both heap and off-heap modes).
  */
 public class CrossSegmentTypeTest {
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java b/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
index 5e9fdad..68b7d91 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertEquals;
 
 /**
  * Verifies correct accesses with regards to endianness in {@link HeapMemorySegment} and {@link
- * HybridMemorySegment} (in both heap and off-heap modes).
+ * OffHeapMemorySegment} (in both heap and off-heap modes).
  */
 public class EndiannessAccessChecks {
 
@@ -62,7 +62,7 @@ public class EndiannessAccessChecks {
             rnd.setSeed(seed);
             for (int i = 0; i < 10000; i++) {
                 long val = rnd.nextLong();
-                int pos = rnd.nextInt(segment.size - 7);
+                int pos = rnd.nextInt(segment.size() - 7);
 
                 segment.putLongLittleEndian(pos, val);
                 long r = segment.getLongBigEndian(pos);
@@ -81,7 +81,7 @@ public class EndiannessAccessChecks {
             rnd.setSeed(seed);
             for (int i = 0; i < 10000; i++) {
                 int val = rnd.nextInt();
-                int pos = rnd.nextInt(segment.size - 3);
+                int pos = rnd.nextInt(segment.size() - 3);
 
                 segment.putIntLittleEndian(pos, val);
                 int r = segment.getIntBigEndian(pos);
@@ -100,7 +100,7 @@ public class EndiannessAccessChecks {
             rnd.setSeed(seed);
             for (int i = 0; i < 10000; i++) {
                 short val = (short) rnd.nextInt();
-                int pos = rnd.nextInt(segment.size - 1);
+                int pos = rnd.nextInt(segment.size() - 1);
 
                 segment.putShortLittleEndian(pos, val);
                 short r = segment.getShortBigEndian(pos);
@@ -119,7 +119,7 @@ public class EndiannessAccessChecks {
             rnd.setSeed(seed);
             for (int i = 0; i < 10000; i++) {
                 char val = (char) rnd.nextInt();
-                int pos = rnd.nextInt(segment.size - 1);
+                int pos = rnd.nextInt(segment.size() - 1);
 
                 segment.putCharLittleEndian(pos, val);
                 char r = segment.getCharBigEndian(pos);
@@ -138,7 +138,7 @@ public class EndiannessAccessChecks {
             rnd.setSeed(seed);
             for (int i = 0; i < 10000; i++) {
                 float val = rnd.nextFloat();
-                int pos = rnd.nextInt(segment.size - 3);
+                int pos = rnd.nextInt(segment.size() - 3);
 
                 segment.putFloatLittleEndian(pos, val);
                 float r = segment.getFloatBigEndian(pos);
@@ -160,7 +160,7 @@ public class EndiannessAccessChecks {
             rnd.setSeed(seed);
             for (int i = 0; i < 10000; i++) {
                 double val = rnd.nextDouble();
-                int pos = rnd.nextInt(segment.size - 7);
+                int pos = rnd.nextInt(segment.size() - 7);
 
                 segment.putDoubleLittleEndian(pos, val);
                 double r = segment.getDoubleBigEndian(pos);
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java
index e39295ea..0693955 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapDirectMemorySegmentTest.java
@@ -30,7 +30,7 @@ import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-/** Tests for the {@link HybridMemorySegment} in off-heap mode using direct memory. */
+/** Tests for the {@link OffHeapMemorySegment} in off-heap mode using direct memory. */
 @RunWith(Parameterized.class)
 public class HybridOffHeapDirectMemorySegmentTest extends MemorySegmentTestBase {
 
@@ -51,7 +51,7 @@ public class HybridOffHeapDirectMemorySegmentTest extends MemorySegmentTestBase
     @Test
     public void testHybridHeapSegmentSpecifics() {
         final int bufSize = 411;
-        HybridMemorySegment seg = (HybridMemorySegment) createSegment(bufSize);
+        OffHeapMemorySegment seg = (OffHeapMemorySegment) createSegment(bufSize);
 
         assertFalse(seg.isFreed());
         assertTrue(seg.isOffHeap());
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java
index 2f77153..70cec1c4 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/HybridOffHeapUnsafeMemorySegmentTest.java
@@ -26,7 +26,7 @@ import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertTrue;
 
-/** Tests for the {@link HybridMemorySegment} in off-heap mode using unsafe memory. */
+/** Tests for the {@link OffHeapMemorySegment} in off-heap mode using unsafe memory. */
 @RunWith(Parameterized.class)
 public class HybridOffHeapUnsafeMemorySegmentTest extends MemorySegmentTestBase {
 
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOnHeapMemorySegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/HybridOnHeapMemorySegmentTest.java
deleted file mode 100644
index f1398d5..0000000
--- a/flink-core/src/test/java/org/apache/flink/core/memory/HybridOnHeapMemorySegmentTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.core.memory;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/** Tests for the {@link HybridMemorySegment} in on-heap mode. */
-@RunWith(Parameterized.class)
-public class HybridOnHeapMemorySegmentTest extends MemorySegmentTestBase {
-
-    public HybridOnHeapMemorySegmentTest(int pageSize) {
-        super(pageSize);
-    }
-
-    @Override
-    MemorySegment createSegment(int size) {
-        return MemorySegmentFactory.allocateUnpooledSegment(size);
-    }
-
-    @Override
-    MemorySegment createSegment(int size, Object owner) {
-        return MemorySegmentFactory.allocateUnpooledSegment(size, owner);
-    }
-
-    @Test
-    public void testHybridHeapSegmentSpecifics() {
-        final byte[] buffer = new byte[411];
-        HybridMemorySegment seg = new HybridMemorySegment(buffer, null);
-
-        assertFalse(seg.isFreed());
-        assertFalse(seg.isOffHeap());
-        assertEquals(buffer.length, seg.size());
-        assertTrue(buffer == seg.getArray());
-
-        ByteBuffer buf1 = seg.wrap(1, 2);
-        ByteBuffer buf2 = seg.wrap(3, 4);
-
-        assertTrue(buf1 != buf2);
-        assertEquals(1, buf1.position());
-        assertEquals(3, buf1.limit());
-        assertEquals(3, buf2.position());
-        assertEquals(7, buf2.limit());
-    }
-
-    @Test
-    public void testReadOnlyByteBufferPut() {
-        final byte[] buffer = new byte[100];
-        HybridMemorySegment seg = new HybridMemorySegment(buffer, null);
-
-        String content = "hello world";
-        ByteBuffer bb = ByteBuffer.allocate(20);
-        bb.put(content.getBytes());
-        bb.rewind();
-
-        int offset = 10;
-        int numBytes = 5;
-
-        ByteBuffer readOnlyBuf = bb.asReadOnlyBuffer();
-        assertFalse(readOnlyBuf.isDirect());
-        assertFalse(readOnlyBuf.hasArray());
-
-        seg.put(offset, readOnlyBuf, numBytes);
-
-        // verify the area before the written region.
-        for (int i = 0; i < offset; i++) {
-            assertEquals(0, buffer[i]);
-        }
-
-        // verify the region that is written.
-        assertEquals("hello", new String(buffer, offset, numBytes));
-
-        // verify the area after the written region.
-        for (int i = offset + numBytes; i < buffer.length; i++) {
-            assertEquals(0, buffer[i]);
-        }
-    }
-}
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
index 5b199c6..d46e74f 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentChecksTest.java
@@ -40,18 +40,13 @@ public class MemorySegmentChecksTest {
     }
 
     @Test(expected = NullPointerException.class)
-    public void testHybridHeapNullBuffer2() {
-        new HybridMemorySegment((byte[]) null, new Object());
-    }
-
-    @Test(expected = NullPointerException.class)
     public void testHybridOffHeapNullBuffer2() {
-        new HybridMemorySegment((ByteBuffer) null, new Object());
+        new OffHeapMemorySegment((ByteBuffer) null, new Object());
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testHybridNonDirectBuffer() {
-        new HybridMemorySegment(ByteBuffer.allocate(1024), new Object());
+        new OffHeapMemorySegment(ByteBuffer.allocate(1024), new Object());
     }
 
     @Test(expected = IllegalArgumentException.class)
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
index a2286d2..3500e33 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
@@ -32,7 +32,7 @@ public class MemorySegmentFactoryTest {
         arraycopy(data, 0, changingData, 0, data.length);
         MemorySegment segment = MemorySegmentFactory.wrapCopy(changingData, 0, changingData.length);
         changingData[0]++;
-        assertArrayEquals(data, segment.heapMemory);
+        assertArrayEquals(data, segment.getArray());
     }
 
     @Test
@@ -41,7 +41,7 @@ public class MemorySegmentFactoryTest {
         MemorySegment segment = MemorySegmentFactory.wrapCopy(data, 0, data.length / 2);
         byte[] exp = new byte[segment.size()];
         arraycopy(data, 0, exp, 0, exp.length);
-        assertArrayEquals(exp, segment.heapMemory);
+        assertArrayEquals(exp, segment.getArray());
     }
 
     @Test
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
index d825ca3..ace47fb 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
@@ -32,7 +32,7 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for undersized {@link HeapMemorySegment} and {@link HybridMemorySegment} (in both heap and
+ * Tests for undersized {@link HeapMemorySegment} and {@link OffHeapMemorySegment} (in both heap and
  * off-heap modes).
  */
 public class MemorySegmentUndersizedTest {
@@ -46,14 +46,6 @@ public class MemorySegmentUndersizedTest {
     }
 
     @Test
-    public void testZeroSizeHeapHybridSegment() {
-        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(0);
-
-        testZeroSizeBuffer(segment);
-        testSegmentWithSizeLargerZero(segment);
-    }
-
-    @Test
     public void testZeroSizeOffHeapHybridSegment() {
         MemorySegment segment = MemorySegmentFactory.allocateUnpooledOffHeapMemory(0);
 
@@ -75,11 +67,6 @@ public class MemorySegmentUndersizedTest {
     }
 
     @Test
-    public void testSizeOneHeapHybridSegment() {
-        testSegmentWithSizeLargerZero(MemorySegmentFactory.allocateUnpooledSegment(1));
-    }
-
-    @Test
     public void testSizeOneOffHeapHybridSegment() {
         testSegmentWithSizeLargerZero(MemorySegmentFactory.allocateUnpooledOffHeapMemory(1));
     }
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
index 438ddcd..cb7645c 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
@@ -34,7 +34,7 @@ import static org.junit.Assert.fail;
 
 /**
  * Various tests with freed memory segments for {@link HeapMemorySegment} and {@link
- * HybridMemorySegment} (in both heap and off-heap modes).
+ * OffHeapMemorySegment} (in both heap and off-heap modes).
  */
 public class OperationsOnFreedSegmentTest {
 
@@ -50,20 +50,17 @@ public class OperationsOnFreedSegmentTest {
     @Test
     public void testCompare() {
         MemorySegment aliveHeap = new HeapMemorySegment(new byte[PAGE_SIZE]);
-        MemorySegment aliveHybridHeap = MemorySegmentFactory.wrap(new byte[PAGE_SIZE]);
         MemorySegment aliveHybridOffHeap =
                 MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
 
         MemorySegment freedHeap = new HeapMemorySegment(new byte[PAGE_SIZE]);
-        MemorySegment freedHybridHeap = MemorySegmentFactory.wrap(new byte[PAGE_SIZE]);
         MemorySegment freedHybridOffHeap =
                 MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
         freedHeap.free();
-        freedHybridHeap.free();
         freedHybridOffHeap.free();
 
-        MemorySegment[] alive = {aliveHeap, aliveHybridHeap, aliveHybridOffHeap};
-        MemorySegment[] free = {freedHeap, freedHybridHeap, freedHybridOffHeap};
+        MemorySegment[] alive = {aliveHeap, aliveHybridOffHeap};
+        MemorySegment[] free = {freedHeap, freedHybridOffHeap};
 
         // alive with free
         for (MemorySegment seg1 : alive) {
@@ -128,12 +125,11 @@ public class OperationsOnFreedSegmentTest {
 
     private static MemorySegment[] createTestSegments() {
         MemorySegment heap = new HeapMemorySegment(new byte[PAGE_SIZE]);
-        MemorySegment hybridHeap = MemorySegmentFactory.wrap(new byte[PAGE_SIZE]);
         MemorySegment hybridOffHeap = MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
         MemorySegment hybridOffHeapUnsafe =
                 MemorySegmentFactory.allocateOffHeapUnsafeMemory(PAGE_SIZE);
 
-        MemorySegment[] segments = {heap, hybridHeap, hybridOffHeap, hybridOffHeapUnsafe};
+        MemorySegment[] segments = {heap, hybridOffHeap, hybridOffHeapUnsafe};
 
         return segments;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 19c2dfd..82551cd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.memory;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.memory.HybridMemorySegment;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.OffHeapMemorySegment;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.LongFunctionWithException;
@@ -55,7 +55,7 @@ import static org.apache.flink.core.memory.MemorySegmentFactory.allocateOffHeapU
  * reused later.
  *
  * <p>The memory segments are represented as off-heap unsafe memory regions (both via {@link
- * HybridMemorySegment}). Releasing a memory segment will make it re-claimable by the garbage
+ * OffHeapMemorySegment}). Releasing a memory segment will make it re-claimable by the garbage
  * collector, but does not necessarily immediately releases the underlying memory.
  */
 public class MemoryManager {


[flink] 04/04: [FLINK-21417][core] Update MemorySegmentFactory with explicit segment types.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 86015c766ba186f18c2b3b41c3900ea4f809a1c2
Author: Xintong Song <to...@gmail.com>
AuthorDate: Fri Feb 19 17:12:54 2021 +0800

    [FLINK-21417][core] Update MemorySegmentFactory with explicit segment types.
    
    This closes #14966
---
 .../core/memory/ByteArrayInputStreamWithPos.java   |  4 +-
 .../flink/core/memory/HeapMemorySegment.java       | 52 -------------------
 .../flink/core/memory/MemorySegmentFactory.java    | 43 +++++++---------
 .../api/common/typeutils/ComparatorTestBase.java   |  2 +-
 .../flink/core/memory/CrossSegmentTypeTest.java    |  8 +--
 .../core/memory/DataInputOutputSerializerTest.java |  2 +-
 .../flink/core/memory/DirectMemorySegmentTest.java | 10 ++--
 .../flink/core/memory/EndiannessAccessChecks.java  | 22 +++-----
 .../flink/core/memory/HeapMemorySegmentTest.java   |  4 +-
 .../core/memory/MemorySegmentFactoryTest.java      | 11 ++--
 .../core/memory/MemorySegmentUndersizedTest.java   |  8 +--
 .../core/memory/OperationsOnFreedSegmentTest.java  | 12 ++---
 .../flink/core/memory/UnsafeMemorySegmentTest.java | 11 ++--
 .../apache/flink/types/NormalizableKeyTest.java    |  2 +-
 .../runtime/io/disk/FileBasedBufferIterator.java   |  4 +-
 .../network/api/serialization/EventSerializer.java |  4 +-
 .../api/serialization/NonSpanningWrapper.java      |  2 +-
 .../network/api/serialization/SpanningWrapper.java | 14 ++---
 .../io/network/buffer/BufferCompressor.java        |  3 +-
 .../io/network/buffer/BufferDecompressor.java      |  3 +-
 .../io/network/buffer/NetworkBufferPool.java       |  2 +-
 .../io/network/netty/NetworkBufferAllocator.java   |  2 +-
 .../network/partition/BufferReaderWriterUtil.java  |  2 +-
 .../network/partition/FileChannelBoundedData.java  |  3 +-
 .../network/partition/PartitionSortedBuffer.java   |  2 +-
 .../partition/SortMergeResultPartition.java        |  2 +-
 .../partition/SortMergeSubpartitionReader.java     |  3 +-
 .../partition/consumer/SingleInputGate.java        |  2 +-
 .../apache/flink/runtime/memory/MemoryManager.java |  4 +-
 .../channel/ChannelStateCheckpointWriterTest.java  | 10 ++--
 .../channel/ChannelStateChunkReaderTest.java       |  2 +-
 .../channel/ChannelStateSerializerImplTest.java    |  9 ++--
 ...ChannelStateWriteRequestDispatcherImplTest.java |  2 +-
 .../ChannelStateWriteRequestDispatcherTest.java    |  4 +-
 .../channel/ChannelStateWriterImplTest.java        |  4 +-
 .../SequentialChannelStateReaderImplTest.java      |  2 +-
 .../AsynchronousBufferFileWriterTest.java          |  2 +-
 .../iomanager/AsynchronousFileIOChannelTest.java   |  4 +-
 .../BufferFileWriterFileSegmentReaderTest.java     |  5 +-
 .../disk/iomanager/BufferFileWriterReaderTest.java |  2 +-
 .../io/disk/iomanager/IOManagerAsyncTest.java      |  4 +-
 .../network/api/serialization/PagedViewsTest.java  |  4 +-
 .../SpanningRecordSerializationTest.java           |  2 +-
 .../api/serialization/SpanningWrapperTest.java     |  8 +--
 .../AbstractCollectingResultPartitionWriter.java   |  2 +-
 .../buffer/BufferBuilderAndConsumerTest.java       |  2 +-
 .../io/network/buffer/BufferBuilderTestUtils.java  | 10 ++--
 .../io/network/buffer/BufferCompressionTest.java   |  4 +-
 .../BufferConsumerWithPartialRecordLengthTest.java |  2 +-
 .../io/network/buffer/NetworkBufferTest.java       |  4 +-
 .../network/buffer/ReadOnlySlicedBufferTest.java   |  2 +-
 .../NettyMessageClientDecoderDelegateTest.java     |  2 +-
 .../NettyMessageClientSideSerializationTest.java   |  4 +-
 .../BoundedBlockingSubpartitionWriteReadTest.java  |  2 +-
 .../partition/BufferReaderWriterUtilTest.java      |  6 +--
 .../network/partition/InputChannelTestUtils.java   |  3 +-
 .../partition/PartitionSortedBufferTest.java       | 10 ++--
 .../partition/PartitionedFileWriteReadTest.java    | 15 +++---
 .../partition/PipelinedSubpartitionTest.java       |  2 +-
 .../partition/SortMergeResultPartitionTest.java    |  2 +-
 .../partition/consumer/SingleInputGateTest.java    |  2 +-
 .../runtime/io/network/util/TestBufferFactory.java |  4 +-
 .../io/network/util/TestSubpartitionProducer.java  |  3 +-
 .../runtime/memory/MemorySegmentSimpleTest.java    |  2 +-
 .../operators/hash/CompactingHashTableTest.java    |  2 +-
 .../hash/HashTablePerformanceComparison.java       |  2 +-
 .../runtime/operators/hash/HashTableTest.java      |  2 +-
 .../hash/InPlaceMutableHashTableTest.java          |  2 +-
 .../operators/hash/MutableHashTableTestBase.java   |  2 +-
 .../flink/runtime/operators/util/BitSetTest.java   |  2 +-
 .../runtime/operators/util/BloomFilterTest.java    |  2 +-
 .../runtime/state/ChannelPersistenceITCase.java    |  4 +-
 .../state/heap/CopyOnWriteSkipListStateMap.java    |  4 +-
 .../runtime/state/heap/SkipListKeySerializer.java  |  8 +--
 .../flink/runtime/state/heap/SkipListUtils.java    |  2 +-
 .../CopyOnWriteSkipListStateMapBasicOpTest.java    |  4 +-
 .../state/heap/SkipListKeyComparatorTest.java      | 12 +++--
 .../runtime/state/heap/SkipListSerializerTest.java |  4 +-
 .../runtime/state/heap/SkipListUtilsTest.java      | 10 ++--
 .../flink/runtime/state/heap/TestAllocator.java    |  2 +-
 .../CheckpointBarrierTrackerTest.java              |  2 +-
 .../flink/table/data/binary/BinaryArrayData.java   |  4 +-
 .../flink/table/data/binary/BinaryMapData.java     |  4 +-
 .../table/data/binary/BinaryRawValueData.java      |  8 ++-
 .../flink/table/data/binary/BinaryRowData.java     |  2 +-
 .../flink/table/data/binary/BinaryStringData.java  |  8 +--
 .../flink/table/data/binary/NestedRowData.java     |  2 +-
 .../planner/codegen/SortCodeGeneratorTest.java     |  2 +-
 .../flink/table/data/binary/BinaryRowDataUtil.java |  2 +-
 .../table/data/writer/AbstractBinaryWriter.java    |  4 +-
 .../flink/table/data/writer/BinaryArrayWriter.java |  2 +-
 .../flink/table/data/writer/BinaryRowWriter.java   |  2 +-
 .../raw/RawFormatDeserializationSchema.java        |  3 +-
 .../formats/raw/RawFormatSerializationSchema.java  | 11 ++--
 .../runtime/hashtable/BaseHybridHashTable.java     |  5 +-
 .../runtime/hashtable/BinaryHashBucketArea.java    |  3 +-
 .../table/runtime/hashtable/LongHashPartition.java |  2 +-
 .../runtime/io/CompressedBlockChannelReader.java   |  3 +-
 .../runtime/io/CompressedBlockChannelWriter.java   |  2 +-
 ...CompressedHeaderlessChannelReaderInputView.java |  4 +-
 ...ompressedHeaderlessChannelWriterOutputView.java |  4 +-
 .../BytesHashMapSpillMemorySegmentPool.java        |  2 +-
 .../runtime/typeutils/ArrayDataSerializer.java     |  2 +-
 .../runtime/typeutils/BinaryRowDataSerializer.java |  4 +-
 .../table/runtime/typeutils/MapDataSerializer.java |  2 +-
 .../runtime/typeutils/RawValueDataSerializer.java  |  4 +-
 .../flink/table/runtime/util/FileChannelUtil.java  |  8 ++-
 .../runtime/util/ResettableExternalBuffer.java     |  2 +-
 .../collections/binary/AbstractBytesHashMap.java   |  2 +-
 .../flink/table/data/BinaryArrayDataTest.java      |  4 +-
 .../apache/flink/table/data/BinaryRowDataTest.java | 25 ++++-----
 .../flink/table/data/BinaryStringDataTest.java     | 28 +++++-----
 .../table/data/binary/BinarySegmentUtilsTest.java  | 60 ++++++++++++----------
 .../flink/table/data/util/DataFormatTestUtil.java  |  9 ++--
 .../table/runtime/operators/sort/SortUtilTest.java |  4 +-
 .../operators/sort/TestMemorySegmentPool.java      |  2 +-
 .../collections/binary/BytesHashMapTestBase.java   |  7 +--
 .../manual/HashTableRecordWidthCombinations.java   |  2 +-
 .../CaseClassNormalizedKeySortingTest.scala        |  2 +-
 119 files changed, 333 insertions(+), 369 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
index 5f622aa..19486c3 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayInputStreamWithPos.java
@@ -38,10 +38,10 @@ public class ByteArrayInputStreamWithPos extends MemorySegmentInputStreamWithPos
     }
 
     public ByteArrayInputStreamWithPos(byte[] buffer, int offset, int length) {
-        super(MemorySegmentFactory.wrap(buffer), offset, length);
+        super(MemorySegmentFactory.wrapHeapSegment(buffer), offset, length);
     }
 
     public void setBuffer(byte[] buffer, int off, int len) {
-        setSegment(MemorySegmentFactory.wrap(buffer), off, len);
+        setSegment(MemorySegmentFactory.wrapHeapSegment(buffer), off, len);
     }
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
index 808a5cd..7263841 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HeapMemorySegment.java
@@ -148,56 +148,4 @@ public final class HeapMemorySegment extends MemorySegment {
         // ByteBuffer performs the boundary checks
         source.get(this.memory, offset, numBytes);
     }
-
-    // -------------------------------------------------------------------------
-    //                             Factoring
-    // -------------------------------------------------------------------------
-
-    /**
-     * A memory segment factory that produces heap memory segments. Note that this factory does not
-     * support to allocate off-heap memory.
-     */
-    public static final class HeapMemorySegmentFactory {
-
-        /**
-         * Creates a new memory segment that targets the given heap memory region.
-         *
-         * @param memory The heap memory region.
-         * @return A new memory segment that targets the given heap memory region.
-         */
-        public HeapMemorySegment wrap(byte[] memory) {
-            return new HeapMemorySegment(memory);
-        }
-
-        /**
-         * Allocates some unpooled memory and creates a new memory segment that represents that
-         * memory.
-         *
-         * @param size The size of the memory segment to allocate.
-         * @param owner The owner to associate with the memory segment.
-         * @return A new memory segment, backed by unpooled heap memory.
-         */
-        public HeapMemorySegment allocateUnpooledSegment(int size, Object owner) {
-            return new HeapMemorySegment(new byte[size], owner);
-        }
-
-        /**
-         * Creates a memory segment that wraps the given byte array.
-         *
-         * <p>This method is intended to be used for components which pool memory and create memory
-         * segments around long-lived memory regions.
-         *
-         * @param memory The heap memory to be represented by the memory segment.
-         * @param owner The owner to associate with the memory segment.
-         * @return A new memory segment representing the given heap memory.
-         */
-        public HeapMemorySegment wrapPooledHeapMemory(byte[] memory, Object owner) {
-            return new HeapMemorySegment(memory, owner);
-        }
-
-        /** Prevent external instantiation. */
-        HeapMemorySegmentFactory() {}
-    }
-
-    public static final HeapMemorySegmentFactory FACTORY = new HeapMemorySegmentFactory();
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index b60c836..b195f80 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -30,14 +30,7 @@ import java.nio.ByteBuffer;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
-/**
- * A factory for (hybrid) memory segments ({@link OffHeapMemorySegment}).
- *
- * <p>The purpose of this factory is to make sure that all memory segments for heap data are of the
- * same type. That way, the runtime does not mix the various specializations of the {@link
- * MemorySegment}. Not mixing them has shown to be beneficial to method specialization by the JIT
- * and to overall performance.
- */
+/** A factory for memory segments ({@link MemorySegment}). */
 @Internal
 public final class MemorySegmentFactory {
     private static final Logger LOG = LoggerFactory.getLogger(MemorySegmentFactory.class);
@@ -51,7 +44,7 @@ public final class MemorySegmentFactory {
      * @param buffer The heap memory region.
      * @return A new memory segment that targets the given heap memory region.
      */
-    public static MemorySegment wrap(byte[] buffer) {
+    public static HeapMemorySegment wrapHeapSegment(byte[] buffer) {
         return new HeapMemorySegment(buffer);
     }
 
@@ -64,11 +57,11 @@ public final class MemorySegmentFactory {
      * @return A new memory segment that targets a copy of the given heap memory region.
      * @throws IllegalArgumentException if start > end or end > bytes.length
      */
-    public static MemorySegment wrapCopy(byte[] bytes, int start, int end)
+    public static HeapMemorySegment wrapCopyHeapSegment(byte[] bytes, int start, int end)
             throws IllegalArgumentException {
         checkArgument(end >= start);
         checkArgument(end <= bytes.length);
-        MemorySegment copy = allocateUnpooledSegment(end - start);
+        HeapMemorySegment copy = allocateHeapSegment(end - start);
         copy.put(0, bytes, start, copy.size());
         return copy;
     }
@@ -78,34 +71,34 @@ public final class MemorySegmentFactory {
      *
      * @see ByteBuffer#putInt(int)
      */
-    public static MemorySegment wrapInt(int value) {
-        return wrap(ByteBuffer.allocate(Integer.BYTES).putInt(value).array());
+    public static HeapMemorySegment wrapIntHeapSegment(int value) {
+        return wrapHeapSegment(ByteBuffer.allocate(Integer.BYTES).putInt(value).array());
     }
 
     /**
      * Allocates some unpooled memory and creates a new memory segment that represents that memory.
      *
-     * <p>This method is similar to {@link #allocateUnpooledSegment(int, Object)}, but the memory
+     * <p>This method is similar to {@link #allocateHeapSegment(int, Object)}, but the memory
      * segment will have null as the owner.
      *
      * @param size The size of the memory segment to allocate.
      * @return A new memory segment, backed by unpooled heap memory.
      */
-    public static MemorySegment allocateUnpooledSegment(int size) {
-        return allocateUnpooledSegment(size, null);
+    public static HeapMemorySegment allocateHeapSegment(int size) {
+        return allocateHeapSegment(size, null);
     }
 
     /**
      * Allocates some unpooled memory and creates a new memory segment that represents that memory.
      *
-     * <p>This method is similar to {@link #allocateUnpooledSegment(int)}, but additionally sets the
+     * <p>This method is similar to {@link #allocateHeapSegment(int)}, but additionally sets the
      * owner of the memory segment.
      *
      * @param size The size of the memory segment to allocate.
      * @param owner The owner to associate with the memory segment.
      * @return A new memory segment, backed by unpooled heap memory.
      */
-    public static MemorySegment allocateUnpooledSegment(int size, Object owner) {
+    public static HeapMemorySegment allocateHeapSegment(int size, Object owner) {
         return new HeapMemorySegment(new byte[size], owner);
     }
 
@@ -116,8 +109,8 @@ public final class MemorySegmentFactory {
      * @param size The size of the off-heap memory segment to allocate.
      * @return A new memory segment, backed by unpooled off-heap memory.
      */
-    public static MemorySegment allocateUnpooledOffHeapMemory(int size) {
-        return allocateUnpooledOffHeapMemory(size, null);
+    public static DirectMemorySegment allocateDirectSegment(int size) {
+        return allocateDirectSegment(size, null);
     }
 
     /**
@@ -128,14 +121,14 @@ public final class MemorySegmentFactory {
      * @param owner The owner to associate with the off-heap memory segment.
      * @return A new memory segment, backed by unpooled off-heap memory.
      */
-    public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) {
+    public static DirectMemorySegment allocateDirectSegment(int size, Object owner) {
         ByteBuffer memory = allocateDirectMemory(size);
         return new DirectMemorySegment(memory, owner);
     }
 
     @VisibleForTesting
-    public static MemorySegment allocateOffHeapUnsafeMemory(int size) {
-        return allocateOffHeapUnsafeMemory(size, null, NO_OP);
+    public static UnsafeMemorySegment allocateUnsafeSegment(int size) {
+        return allocateUnsafeSegment(size, null, NO_OP);
     }
 
     private static ByteBuffer allocateDirectMemory(int size) {
@@ -171,7 +164,7 @@ public final class MemorySegmentFactory {
      * @param customCleanupAction A custom action to run upon calling GC cleaner.
      * @return A new memory segment, backed by off-heap unsafe memory.
      */
-    public static MemorySegment allocateOffHeapUnsafeMemory(
+    public static UnsafeMemorySegment allocateUnsafeSegment(
             int size, Object owner, Runnable customCleanupAction) {
         long address = MemoryUtils.allocateUnsafe(size);
         ByteBuffer offHeapBuffer = MemoryUtils.wrapUnsafeMemoryWithByteBuffer(address, size);
@@ -191,7 +184,7 @@ public final class MemorySegmentFactory {
      *     segment.
      * @return A new memory segment representing the given off-heap memory.
      */
-    public static MemorySegment wrapOffHeapMemory(ByteBuffer memory) {
+    public static DirectMemorySegment wrapDirectSegment(ByteBuffer memory) {
         return new DirectMemorySegment(memory, null);
     }
 }
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
index 0375af5..89a8acd 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
@@ -256,7 +256,7 @@ public abstract class ComparatorTestBase<T> extends TestLogger {
     // Help Function for setting up a memory segment and normalize the keys of the data array in it
     public MemorySegment setupNormalizedKeysMemSegment(
             T[] data, int normKeyLen, TypeComparator<T> comparator) {
-        MemorySegment memSeg = MemorySegmentFactory.allocateUnpooledSegment(2048);
+        MemorySegment memSeg = MemorySegmentFactory.allocateHeapSegment(2048);
 
         // Setup normalized Keys in the memory segment
         int offset = 0;
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
index a49b802..2f19e17 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/CrossSegmentTypeTest.java
@@ -160,9 +160,9 @@ public class CrossSegmentTypeTest {
     private static MemorySegment[] createSegments(int size) {
         MemorySegment[] segments = {
             new HeapMemorySegment(new byte[size]),
-            MemorySegmentFactory.allocateUnpooledSegment(size),
-            MemorySegmentFactory.allocateUnpooledOffHeapMemory(size),
-            MemorySegmentFactory.allocateOffHeapUnsafeMemory(size)
+            MemorySegmentFactory.allocateHeapSegment(size),
+            MemorySegmentFactory.allocateDirectSegment(size),
+            MemorySegmentFactory.allocateUnsafeSegment(size)
         };
         return segments;
     }
@@ -174,7 +174,7 @@ public class CrossSegmentTypeTest {
         byte[] expected = new byte[pageSize];
         byte[] actual = new byte[pageSize];
         byte[] unsafeCopy = new byte[pageSize];
-        MemorySegment unsafeCopySeg = MemorySegmentFactory.allocateUnpooledSegment(pageSize);
+        MemorySegment unsafeCopySeg = MemorySegmentFactory.allocateHeapSegment(pageSize);
 
         // zero out the memory
         seg1.put(0, expected);
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java
index 542daf7..0e1af03 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/DataInputOutputSerializerTest.java
@@ -37,7 +37,7 @@ public class DataInputOutputSerializerTest {
         SerializationTestType randomInt = Util.randomRecord(SerializationTestTypeFactory.INT);
 
         DataOutputSerializer serializer = new DataOutputSerializer(randomInt.length());
-        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(randomInt.length());
+        MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(randomInt.length());
 
         try {
             // empty buffer, read buffer should be empty
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/DirectMemorySegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/DirectMemorySegmentTest.java
index 446b896..ef1d89b 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/DirectMemorySegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/DirectMemorySegmentTest.java
@@ -39,19 +39,19 @@ public class DirectMemorySegmentTest extends MemorySegmentTestBase {
     }
 
     @Override
-    MemorySegment createSegment(int size) {
-        return MemorySegmentFactory.allocateUnpooledOffHeapMemory(size);
+    DirectMemorySegment createSegment(int size) {
+        return MemorySegmentFactory.allocateDirectSegment(size);
     }
 
     @Override
-    MemorySegment createSegment(int size, Object owner) {
-        return MemorySegmentFactory.allocateUnpooledOffHeapMemory(size, owner);
+    DirectMemorySegment createSegment(int size, Object owner) {
+        return MemorySegmentFactory.allocateDirectSegment(size, owner);
     }
 
     @Test
     public void testDirectSegmentSpecifics() {
         final int bufSize = 411;
-        OffHeapMemorySegment seg = (OffHeapMemorySegment) createSegment(bufSize);
+        DirectMemorySegment seg = createSegment(bufSize);
 
         assertFalse(seg.isFreed());
         assertTrue(seg.isOffHeap());
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java b/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
index 68b7d91..11c8f24 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/EndiannessAccessChecks.java
@@ -25,31 +25,25 @@ import java.util.Random;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Verifies correct accesses with regards to endianness in {@link HeapMemorySegment} and {@link
- * OffHeapMemorySegment} (in both heap and off-heap modes).
+ * Verifies correct accesses with regards to endianness in {@link HeapMemorySegment}, {@link
+ * DirectMemorySegment} and {@link UnsafeMemorySegment}.
  */
 public class EndiannessAccessChecks {
 
     @Test
     public void testHeapSegment() {
-        testBigAndLittleEndianAccessUnaligned(new HeapMemorySegment(new byte[11111]));
-    }
-
-    @Test
-    public void testHybridOnHeapSegment() {
-        testBigAndLittleEndianAccessUnaligned(MemorySegmentFactory.wrap(new byte[11111]));
+        testBigAndLittleEndianAccessUnaligned(
+                MemorySegmentFactory.wrapHeapSegment(new byte[11111]));
     }
 
     @Test
-    public void testHybridOffHeapSegment() {
-        testBigAndLittleEndianAccessUnaligned(
-                MemorySegmentFactory.allocateUnpooledOffHeapMemory(11111));
+    public void testDirectSegment() {
+        testBigAndLittleEndianAccessUnaligned(MemorySegmentFactory.allocateDirectSegment(11111));
     }
 
     @Test
-    public void testHybridOffHeapUnsafeSegment() {
-        testBigAndLittleEndianAccessUnaligned(
-                MemorySegmentFactory.allocateOffHeapUnsafeMemory(11111));
+    public void testUnsafeSegment() {
+        testBigAndLittleEndianAccessUnaligned(MemorySegmentFactory.allocateUnsafeSegment(11111));
     }
 
     private void testBigAndLittleEndianAccessUnaligned(MemorySegment segment) {
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/HeapMemorySegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/HeapMemorySegmentTest.java
index 7263c4f..c6ba1d2 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/HeapMemorySegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/HeapMemorySegmentTest.java
@@ -38,12 +38,12 @@ public class HeapMemorySegmentTest extends MemorySegmentTestBase {
     }
 
     @Override
-    MemorySegment createSegment(int size) {
+    HeapMemorySegment createSegment(int size) {
         return new HeapMemorySegment(new byte[size]);
     }
 
     @Override
-    MemorySegment createSegment(int size, Object owner) {
+    HeapMemorySegment createSegment(int size, Object owner) {
         return new HeapMemorySegment(new byte[size], owner);
     }
 
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
index 3500e33..4920eb4 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentFactoryTest.java
@@ -30,7 +30,8 @@ public class MemorySegmentFactoryTest {
         byte[] data = {1, 2, 3, 4, 5};
         byte[] changingData = new byte[data.length];
         arraycopy(data, 0, changingData, 0, data.length);
-        MemorySegment segment = MemorySegmentFactory.wrapCopy(changingData, 0, changingData.length);
+        MemorySegment segment =
+                MemorySegmentFactory.wrapCopyHeapSegment(changingData, 0, changingData.length);
         changingData[0]++;
         assertArrayEquals(data, segment.getArray());
     }
@@ -38,7 +39,7 @@ public class MemorySegmentFactoryTest {
     @Test
     public void testWrapPartialCopy() {
         byte[] data = {1, 2, 3, 5, 6};
-        MemorySegment segment = MemorySegmentFactory.wrapCopy(data, 0, data.length / 2);
+        MemorySegment segment = MemorySegmentFactory.wrapCopyHeapSegment(data, 0, data.length / 2);
         byte[] exp = new byte[segment.size()];
         arraycopy(data, 0, exp, 0, exp.length);
         assertArrayEquals(exp, segment.getArray());
@@ -46,16 +47,16 @@ public class MemorySegmentFactoryTest {
 
     @Test
     public void testWrapCopyEmpty() {
-        MemorySegmentFactory.wrapCopy(new byte[0], 0, 0);
+        MemorySegmentFactory.wrapCopyHeapSegment(new byte[0], 0, 0);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testWrapCopyWrongStart() {
-        MemorySegmentFactory.wrapCopy(new byte[] {1, 2, 3}, 10, 3);
+        MemorySegmentFactory.wrapCopyHeapSegment(new byte[] {1, 2, 3}, 10, 3);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testWrapCopyWrongEnd() {
-        MemorySegmentFactory.wrapCopy(new byte[] {1, 2, 3}, 0, 10);
+        MemorySegmentFactory.wrapCopyHeapSegment(new byte[] {1, 2, 3}, 0, 10);
     }
 }
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
index 93db20b..9690593 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/MemorySegmentUndersizedTest.java
@@ -47,7 +47,7 @@ public class MemorySegmentUndersizedTest {
 
     @Test
     public void testZeroSizeDirectSegment() {
-        MemorySegment segment = MemorySegmentFactory.allocateUnpooledOffHeapMemory(0);
+        MemorySegment segment = MemorySegmentFactory.allocateDirectSegment(0);
 
         testZeroSizeBuffer(segment);
         testSegmentWithSizeLargerZero(segment);
@@ -55,7 +55,7 @@ public class MemorySegmentUndersizedTest {
 
     @Test
     public void testZeroSizeUnsafeSegment() {
-        MemorySegment segment = MemorySegmentFactory.allocateOffHeapUnsafeMemory(0);
+        MemorySegment segment = MemorySegmentFactory.allocateUnsafeSegment(0);
 
         testZeroSizeBuffer(segment);
         testSegmentWithSizeLargerZero(segment);
@@ -68,12 +68,12 @@ public class MemorySegmentUndersizedTest {
 
     @Test
     public void testSizeOneDirectSegment() {
-        testSegmentWithSizeLargerZero(MemorySegmentFactory.allocateUnpooledOffHeapMemory(1));
+        testSegmentWithSizeLargerZero(MemorySegmentFactory.allocateDirectSegment(1));
     }
 
     @Test
     public void testSizeOneUnsafeSegment() {
-        testSegmentWithSizeLargerZero(MemorySegmentFactory.allocateOffHeapUnsafeMemory(1));
+        testSegmentWithSizeLargerZero(MemorySegmentFactory.allocateUnsafeSegment(1));
     }
 
     private static void testZeroSizeBuffer(MemorySegment segment) {
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
index 63f7468..eacdab7 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/OperationsOnFreedSegmentTest.java
@@ -50,12 +50,12 @@ public class OperationsOnFreedSegmentTest {
     @Test
     public void testCompare() {
         MemorySegment aliveHeap = new HeapMemorySegment(new byte[PAGE_SIZE]);
-        MemorySegment aliveDirect = MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
-        MemorySegment aliveUnsafe = MemorySegmentFactory.allocateOffHeapUnsafeMemory(PAGE_SIZE);
+        MemorySegment aliveDirect = MemorySegmentFactory.allocateDirectSegment(PAGE_SIZE);
+        MemorySegment aliveUnsafe = MemorySegmentFactory.allocateUnsafeSegment(PAGE_SIZE);
 
         MemorySegment freedHeap = new HeapMemorySegment(new byte[PAGE_SIZE]);
-        MemorySegment freedDirect = MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
-        MemorySegment freedUnsafe = MemorySegmentFactory.allocateOffHeapUnsafeMemory(PAGE_SIZE);
+        MemorySegment freedDirect = MemorySegmentFactory.allocateDirectSegment(PAGE_SIZE);
+        MemorySegment freedUnsafe = MemorySegmentFactory.allocateUnsafeSegment(PAGE_SIZE);
         freedHeap.free();
         freedDirect.free();
         freedUnsafe.free();
@@ -126,8 +126,8 @@ public class OperationsOnFreedSegmentTest {
 
     private static MemorySegment[] createTestSegments() {
         MemorySegment heap = new HeapMemorySegment(new byte[PAGE_SIZE]);
-        MemorySegment direct = MemorySegmentFactory.allocateUnpooledOffHeapMemory(PAGE_SIZE);
-        MemorySegment unsafe = MemorySegmentFactory.allocateOffHeapUnsafeMemory(PAGE_SIZE);
+        MemorySegment direct = MemorySegmentFactory.allocateDirectSegment(PAGE_SIZE);
+        MemorySegment unsafe = MemorySegmentFactory.allocateUnsafeSegment(PAGE_SIZE);
 
         MemorySegment[] segments = {heap, direct, unsafe};
 
diff --git a/flink-core/src/test/java/org/apache/flink/core/memory/UnsafeMemorySegmentTest.java b/flink-core/src/test/java/org/apache/flink/core/memory/UnsafeMemorySegmentTest.java
index 9d02287..5739dc2 100644
--- a/flink-core/src/test/java/org/apache/flink/core/memory/UnsafeMemorySegmentTest.java
+++ b/flink-core/src/test/java/org/apache/flink/core/memory/UnsafeMemorySegmentTest.java
@@ -35,13 +35,13 @@ public class UnsafeMemorySegmentTest extends MemorySegmentTestBase {
     }
 
     @Override
-    MemorySegment createSegment(int size) {
-        return MemorySegmentFactory.allocateOffHeapUnsafeMemory(size);
+    UnsafeMemorySegment createSegment(int size) {
+        return MemorySegmentFactory.allocateUnsafeSegment(size);
     }
 
     @Override
-    MemorySegment createSegment(int size, Object owner) {
-        return MemorySegmentFactory.allocateOffHeapUnsafeMemory(size, owner, () -> {});
+    UnsafeMemorySegment createSegment(int size, Object owner) {
+        return MemorySegmentFactory.allocateUnsafeSegment(size, owner, () -> {});
     }
 
     @Override
@@ -53,8 +53,7 @@ public class UnsafeMemorySegmentTest extends MemorySegmentTestBase {
     @Test
     public void testCallCleanerOnFree() {
         final CompletableFuture<Void> cleanerFuture = new CompletableFuture<>();
-        MemorySegmentFactory.allocateOffHeapUnsafeMemory(
-                        10, null, () -> cleanerFuture.complete(null))
+        MemorySegmentFactory.allocateUnsafeSegment(10, null, () -> cleanerFuture.complete(null))
                 .free();
         assertTrue(cleanerFuture.isDone());
     }
diff --git a/flink-core/src/test/java/org/apache/flink/types/NormalizableKeyTest.java b/flink-core/src/test/java/org/apache/flink/types/NormalizableKeyTest.java
index 48023ba..bf46147 100644
--- a/flink-core/src/test/java/org/apache/flink/types/NormalizableKeyTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/NormalizableKeyTest.java
@@ -128,7 +128,7 @@ public class NormalizableKeyTest {
             NormalizableKey<T> key1, NormalizableKey<T> key2, int len) {
 
         byte[] normalizedKeys = new byte[32];
-        MemorySegment wrapper = MemorySegmentFactory.wrap(normalizedKeys);
+        MemorySegment wrapper = MemorySegmentFactory.wrapHeapSegment(normalizedKeys);
 
         key1.copyNormalizedKey(wrapper, 0, len);
         key2.copyNormalizedKey(wrapper, len, len);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
index 3f07d71..8a71c00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/FileBasedBufferIterator.java
@@ -28,7 +28,7 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
-import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapHeapSegment;
 import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
 import static org.apache.flink.util.IOUtils.closeAll;
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -73,7 +73,7 @@ public class FileBasedBufferIterator implements CloseableIterator<Buffer> {
         offset += bytesRead;
         bytesToRead -= bytesRead;
         return new NetworkBuffer(
-                wrap(buffer), FreeingBufferRecycler.INSTANCE, DATA_BUFFER, bytesRead);
+                wrapHeapSegment(buffer), FreeingBufferRecycler.INSTANCE, DATA_BUFFER, bytesRead);
     }
 
     private int read(byte[] buffer) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 22a15d8..3a5098c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -275,7 +275,7 @@ public class EventSerializer {
     public static Buffer toBuffer(AbstractEvent event, boolean hasPriority) throws IOException {
         final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
 
-        MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array());
+        MemorySegment data = MemorySegmentFactory.wrapHeapSegment(serializedEvent.array());
 
         final Buffer buffer =
                 new NetworkBuffer(
@@ -289,7 +289,7 @@ public class EventSerializer {
             throws IOException {
         final ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(event);
 
-        MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array());
+        MemorySegment data = MemorySegmentFactory.wrapHeapSegment(serializedEvent.array());
 
         return new BufferConsumer(
                 data, FreeingBufferRecycler.INSTANCE, getDataType(event, hasPriority));
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
index c0adfdc..33b1d24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -74,7 +74,7 @@ final class NonSpanningWrapper implements DataInputView {
         if (!hasRemaining()) {
             return CloseableIterator.empty();
         }
-        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(remaining());
+        MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(remaining());
         this.segment.copyTo(position, segment, 0, remaining());
         return singleBufferIterator(segment);
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
index 5bba021..5b42b4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapper.java
@@ -48,8 +48,8 @@ import java.util.Random;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
-import static org.apache.flink.core.memory.MemorySegmentFactory.wrapCopy;
-import static org.apache.flink.core.memory.MemorySegmentFactory.wrapInt;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapCopyHeapSegment;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapIntHeapSegment;
 import static org.apache.flink.runtime.io.network.api.serialization.NonSpanningWrapper.singleBufferIterator;
 import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
 import static org.apache.flink.util.CloseableIterator.empty;
@@ -193,7 +193,8 @@ final class SpanningWrapper {
 
     CloseableIterator<Buffer> getUnconsumedSegment() throws IOException {
         if (isReadingLength()) {
-            return singleBufferIterator(wrapCopy(lengthBuffer.array(), 0, lengthBuffer.position()));
+            return singleBufferIterator(
+                    wrapCopyHeapSegment(lengthBuffer.array(), 0, lengthBuffer.position()));
         } else if (isAboveSpillingThreshold()) {
             return createSpilledDataIterator();
         } else if (recordLength == -1) {
@@ -209,13 +210,14 @@ final class SpanningWrapper {
             spillingChannel.force(false);
         }
         return CloseableIterator.flatten(
-                toSingleBufferIterator(wrapInt(recordLength)),
+                toSingleBufferIterator(wrapIntHeapSegment(recordLength)),
                 new FileBasedBufferIterator(
                         spillFile, min(accumulatedRecordBytes, recordLength), fileBufferSize),
                 leftOverData == null
                         ? empty()
                         : toSingleBufferIterator(
-                                wrapCopy(leftOverData.getArray(), leftOverStart, leftOverLimit)));
+                                wrapCopyHeapSegment(
+                                        leftOverData.getArray(), leftOverStart, leftOverLimit)));
     }
 
     private MemorySegment copyDataBuffer() throws IOException {
@@ -227,7 +229,7 @@ final class SpanningWrapper {
         if (leftOverData != null) {
             serializer.write(leftOverData, leftOverStart, leftOverSize);
         }
-        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(unconsumedSize);
+        MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(unconsumedSize);
         segment.put(0, serializer.getSharedBuffer(), 0, segment.size());
         return segment;
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java
index 4a58ad2..b5da775 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java
@@ -45,7 +45,8 @@ public class BufferCompressor {
         final byte[] heapBuffer = new byte[2 * bufferSize];
         this.internalBuffer =
                 new NetworkBuffer(
-                        MemorySegmentFactory.wrap(heapBuffer), FreeingBufferRecycler.INSTANCE);
+                        MemorySegmentFactory.wrapHeapSegment(heapBuffer),
+                        FreeingBufferRecycler.INSTANCE);
         this.blockCompressor =
                 BlockCompressionFactory.createBlockCompressionFactory(factoryName).getCompressor();
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java
index c6dc89a..c814a7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java
@@ -45,7 +45,8 @@ public class BufferDecompressor {
         final byte[] heapBuffer = new byte[bufferSize];
         this.internalBuffer =
                 new NetworkBuffer(
-                        MemorySegmentFactory.wrap(heapBuffer), FreeingBufferRecycler.INSTANCE);
+                        MemorySegmentFactory.wrapHeapSegment(heapBuffer),
+                        FreeingBufferRecycler.INSTANCE);
         this.blockDecompressor =
                 BlockCompressionFactory.createBlockCompressionFactory(factoryName)
                         .getDecompressor();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 6936b7b..37fad6d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -114,7 +114,7 @@ public class NetworkBufferPool
         try {
             for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
                 availableMemorySegments.add(
-                        MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
+                        MemorySegmentFactory.allocateDirectSegment(segmentSize, null));
             }
         } catch (OutOfMemoryError err) {
             int allocated = availableMemorySegments.size();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NetworkBufferAllocator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NetworkBufferAllocator.java
index 36367f4..646f4c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NetworkBufferAllocator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NetworkBufferAllocator.java
@@ -70,7 +70,7 @@ class NetworkBufferAllocator {
      */
     Buffer allocateUnPooledNetworkBuffer(int size, Buffer.DataType dataType) {
         byte[] byteArray = new byte[size];
-        MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
+        MemorySegment memSeg = MemorySegmentFactory.wrapHeapSegment(byteArray);
 
         return new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, dataType);
     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
index 649a262..a778bf6 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtil.java
@@ -91,7 +91,7 @@ public final class BufferReaderWriterUtil {
         memory.position(memory.limit());
         memory.limit(memory.capacity());
 
-        MemorySegment memorySegment = MemorySegmentFactory.wrapOffHeapMemory(buf);
+        MemorySegment memorySegment = MemorySegmentFactory.wrapDirectSegment(buf);
 
         Buffer.DataType dataType =
                 isEvent ? Buffer.DataType.EVENT_BUFFER : Buffer.DataType.DATA_BUFFER;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
index 20890ff..dbfa5f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
@@ -131,8 +131,7 @@ final class FileChannelBoundedData implements BoundedData {
             this.buffers = new ArrayDeque<>(NUM_BUFFERS);
 
             for (int i = 0; i < NUM_BUFFERS; i++) {
-                buffers.addLast(
-                        MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize, null));
+                buffers.addLast(MemorySegmentFactory.allocateDirectSegment(bufferSize, null));
             }
 
             this.subpartitionView = checkNotNull(subpartitionView);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBuffer.java
index 0a0ab22..2ee5de4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBuffer.java
@@ -315,7 +315,7 @@ public class PartitionSortedBuffer implements SortBuffer {
 
             // allocate a temp buffer for the event if the target buffer is not big enough
             if (bufferDataType.isEvent() && target.size() < length) {
-                target = MemorySegmentFactory.allocateUnpooledSegment(length);
+                target = MemorySegmentFactory.allocateHeapSegment(length);
             }
 
             numBytesCopied +=
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
index 8ed7b75..1d24b70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartition.java
@@ -108,7 +108,7 @@ public class SortMergeResultPartition extends ResultPartition {
 
         this.networkBufferSize = networkBufferSize;
         this.numDataBuffers = new int[numSubpartitions];
-        this.writeBuffer = MemorySegmentFactory.allocateUnpooledOffHeapMemory(networkBufferSize);
+        this.writeBuffer = MemorySegmentFactory.allocateDirectSegment(networkBufferSize);
 
         PartitionedFileWriter fileWriter = null;
         try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
index a085c6d..2896f0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeSubpartitionReader.java
@@ -78,8 +78,7 @@ public class SortMergeSubpartitionReader implements ResultSubpartitionView, Buff
 
         // allocate two pieces of unmanaged segments for data reading
         for (int i = 0; i < NUM_READ_BUFFERS; i++) {
-            this.readBuffers.add(
-                    MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize, null));
+            this.readBuffers.add(MemorySegmentFactory.allocateDirectSegment(bufferSize, null));
         }
 
         this.fileReader = new PartitionedFileReader(partitionedFile, subpartitionIndex);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 82e09a9..f244a13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -237,7 +237,7 @@ public class SingleInputGate extends IndexedInputGate {
 
         this.closeFuture = new CompletableFuture<>();
 
-        this.unpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(segmentSize);
+        this.unpooledSegment = MemorySegmentFactory.allocateHeapSegment(segmentSize);
     }
 
     protected PrioritizedDeque<InputChannel> getInputChannelsWithData() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index 82551cd..b9f68e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -45,7 +45,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
-import static org.apache.flink.core.memory.MemorySegmentFactory.allocateOffHeapUnsafeMemory;
+import static org.apache.flink.core.memory.MemorySegmentFactory.allocateUnsafeSegment;
 
 /**
  * The memory manager governs the memory that Flink uses for sorting, hashing, caching or off-heap
@@ -245,7 +245,7 @@ public class MemoryManager {
                                     : currentSegmentsForOwner;
                     for (long i = numberOfPages; i > 0; i--) {
                         MemorySegment segment =
-                                allocateOffHeapUnsafeMemory(getPageSize(), owner, pageCleanup);
+                                allocateUnsafeSegment(getPageSize(), owner, pageCleanup);
                         target.add(segment);
                         segmentsForOwner.add(segment);
                     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
index c47caaa..9225619 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateCheckpointWriterTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.runtime.checkpoint.channel;
 
-import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
@@ -46,7 +46,7 @@ import java.util.stream.IntStream;
 import static java.util.Collections.singletonList;
 import static org.apache.flink.core.fs.Path.fromLocalFile;
 import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
-import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapHeapSegment;
 import static org.apache.flink.runtime.state.CheckpointedStateScope.EXCLUSIVE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -116,7 +116,7 @@ public class ChannelStateCheckpointWriterTest {
                         checkpointStreamFactory.createCheckpointStateOutputStream(EXCLUSIVE));
         NetworkBuffer buffer =
                 new NetworkBuffer(
-                        HeapMemorySegment.FACTORY.allocateUnpooledSegment(threshold / 2, null),
+                        MemorySegmentFactory.allocateHeapSegment(threshold / 2, null),
                         FreeingBufferRecycler.INSTANCE);
         writer.writeInput(new InputChannelInfo(1, 2), buffer);
         writer.completeOutput();
@@ -147,7 +147,7 @@ public class ChannelStateCheckpointWriterTest {
         ChannelStateCheckpointWriter writer = createWriter(new ChannelStateWriteResult());
         NetworkBuffer buffer =
                 new NetworkBuffer(
-                        HeapMemorySegment.FACTORY.allocateUnpooledSegment(10, null),
+                        MemorySegmentFactory.allocateHeapSegment(10, null),
                         FreeingBufferRecycler.INSTANCE);
         writer.writeInput(new InputChannelInfo(1, 2), buffer);
         assertTrue(buffer.isRecycled());
@@ -234,7 +234,7 @@ public class ChannelStateCheckpointWriterTest {
     private void write(
             ChannelStateCheckpointWriter writer, InputChannelInfo channelInfo, byte[] data)
             throws Exception {
-        MemorySegment segment = wrap(data);
+        MemorySegment segment = wrapHeapSegment(data);
         NetworkBuffer buffer =
                 new NetworkBuffer(
                         segment,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateChunkReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateChunkReaderTest.java
index f95acf4..b65d049 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateChunkReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateChunkReaderTest.java
@@ -132,7 +132,7 @@ public class ChannelStateChunkReaderTest {
             serializer.writeData(
                     dataStream,
                     new NetworkBuffer(
-                            MemorySegmentFactory.wrap(new byte[size]),
+                            MemorySegmentFactory.wrapHeapSegment(new byte[size]),
                             FreeingBufferRecycler.INSTANCE,
                             Buffer.DataType.DATA_BUFFER,
                             size));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java
index 59134a5..27372a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateSerializerImplTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.runtime.checkpoint.channel;
 
-import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
@@ -91,7 +90,7 @@ public class ChannelStateSerializerImplTest {
         byte[] data = generateData(100);
         BufferBuilder bufferBuilder =
                 new BufferBuilder(
-                        HeapMemorySegment.FACTORY.allocateUnpooledSegment(data.length, null),
+                        MemorySegmentFactory.allocateHeapSegment(data.length, null),
                         FreeingBufferRecycler.INSTANCE);
         BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
 
@@ -112,7 +111,7 @@ public class ChannelStateSerializerImplTest {
     private NetworkBuffer getBuffer(byte[] data) {
         NetworkBuffer buffer =
                 new NetworkBuffer(
-                        HeapMemorySegment.FACTORY.allocateUnpooledSegment(data.length, null),
+                        MemorySegmentFactory.allocateHeapSegment(data.length, null),
                         FreeingBufferRecycler.INSTANCE);
         buffer.writeBytes(data);
         return buffer;
@@ -130,7 +129,7 @@ public class ChannelStateSerializerImplTest {
         serializer.writeHeader(out);
         NetworkBuffer buffer =
                 new NetworkBuffer(
-                        MemorySegmentFactory.allocateUnpooledSegment(data.length),
+                        MemorySegmentFactory.allocateHeapSegment(data.length),
                         FreeingBufferRecycler.INSTANCE);
         try {
             buffer.writeBytes(data);
@@ -149,7 +148,7 @@ public class ChannelStateSerializerImplTest {
         assertEquals(data.length, size);
         NetworkBuffer buffer =
                 new NetworkBuffer(
-                        MemorySegmentFactory.allocateUnpooledSegment(data.length),
+                        MemorySegmentFactory.allocateHeapSegment(data.length),
                         FreeingBufferRecycler.INSTANCE);
         try {
             int read = serializer.readData(is, wrap(buffer), size);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java
index bf0639b..75f0765 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java
@@ -78,6 +78,6 @@ public class ChannelStateWriteRequestDispatcherImplTest {
 
     private NetworkBuffer buffer() {
         return new NetworkBuffer(
-                MemorySegmentFactory.allocateUnpooledSegment(10), FreeingBufferRecycler.INSTANCE);
+                MemorySegmentFactory.allocateHeapSegment(10), FreeingBufferRecycler.INSTANCE);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
index 5f7c58f..3f9cb21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java
@@ -99,7 +99,7 @@ public class ChannelStateWriteRequestDispatcherTest {
                 new InputChannelInfo(1, 1),
                 CloseableIterator.ofElement(
                         new NetworkBuffer(
-                                MemorySegmentFactory.allocateUnpooledSegment(1),
+                                MemorySegmentFactory.allocateHeapSegment(1),
                                 FreeingBufferRecycler.INSTANCE),
                         Buffer::recycleBuffer));
     }
@@ -109,7 +109,7 @@ public class ChannelStateWriteRequestDispatcherTest {
                 CHECKPOINT_ID,
                 new ResultSubpartitionInfo(1, 1),
                 new NetworkBuffer(
-                        MemorySegmentFactory.allocateUnpooledSegment(1),
+                        MemorySegmentFactory.allocateHeapSegment(1),
                         FreeingBufferRecycler.INSTANCE));
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 196ac2e..b20c7c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.runtime.checkpoint.channel;
 
-import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -260,7 +260,7 @@ public class ChannelStateWriterImplTest {
 
     private NetworkBuffer getBuffer() {
         return new NetworkBuffer(
-                HeapMemorySegment.FACTORY.allocateUnpooledSegment(123, null),
+                MemorySegmentFactory.allocateHeapSegment(123, null),
                 FreeingBufferRecycler.INSTANCE);
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImplTest.java
index f06f3bc..56635f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImplTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImplTest.java
@@ -346,7 +346,7 @@ public class SequentialChannelStateReaderImplTest {
 
     private NetworkBuffer wrap(byte[] bytes) {
         return new NetworkBuffer(
-                MemorySegmentFactory.wrap(bytes),
+                MemorySegmentFactory.wrapHeapSegment(bytes),
                 FreeingBufferRecycler.INSTANCE,
                 Buffer.DataType.DATA_BUFFER,
                 bytes.length);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index 17d191f..5319d9b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -91,7 +91,7 @@ public class AsynchronousBufferFileWriterTest {
 
         Buffer buffer =
                 new NetworkBuffer(
-                        MemorySegmentFactory.allocateUnpooledSegment(4096),
+                        MemorySegmentFactory.allocateHeapSegment(4096),
                         FreeingBufferRecycler.INSTANCE);
         try {
             writer.writeBlock(buffer);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
index 0da6fe7..eb6e6c7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannelTest.java
@@ -277,7 +277,7 @@ public class AsynchronousFileIOChannelTest {
         try (final IOManagerAsync ioMan = new IOManagerAsync()) {
 
             final int NUM_BLOCKS = 100;
-            final MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(32 * 1024);
+            final MemorySegment seg = MemorySegmentFactory.allocateHeapSegment(32 * 1024);
 
             final AtomicInteger callbackCounter = new AtomicInteger();
             final AtomicBoolean exceptionOccurred = new AtomicBoolean();
@@ -334,7 +334,7 @@ public class AsynchronousFileIOChannelTest {
     private void testExceptionForwardsToClose(
             IOManagerAsync ioMan, final int numBlocks, final int failingBlock) {
         try {
-            MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(32 * 1024);
+            MemorySegment seg = MemorySegmentFactory.allocateHeapSegment(32 * 1024);
             FileIOChannel.ID channelId = ioMan.createChannel();
 
             BlockChannelWriterWithCallback<MemorySegment> writer =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
index 064700c..40408ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -159,7 +159,8 @@ public class BufferFileWriterFileSegmentReaderTest {
             fileSegment.getFileChannel().read(buffer, fileSegment.getPosition());
 
             Buffer buffer1 =
-                    new NetworkBuffer(MemorySegmentFactory.wrap(buffer.array()), BUFFER_RECYCLER);
+                    new NetworkBuffer(
+                            MemorySegmentFactory.wrapHeapSegment(buffer.array()), BUFFER_RECYCLER);
             buffer1.setSize(fileSegment.getLength());
             currentNumber = verifyBufferFilledWithAscendingNumbers(buffer1, currentNumber);
         }
@@ -185,6 +186,6 @@ public class BufferFileWriterFileSegmentReaderTest {
 
     private Buffer createBuffer() {
         return new NetworkBuffer(
-                MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), BUFFER_RECYCLER);
+                MemorySegmentFactory.allocateHeapSegment(BUFFER_SIZE), BUFFER_RECYCLER);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index e97109b..f3a78cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -203,7 +203,7 @@ public class BufferFileWriterReaderTest {
 
     private Buffer createBuffer() {
         return new NetworkBuffer(
-                MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), BUFFER_RECYCLER);
+                MemorySegmentFactory.allocateHeapSegment(BUFFER_SIZE), BUFFER_RECYCLER);
     }
 
     static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber, int size) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
index c274a60..7e10cee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsyncTest.java
@@ -64,7 +64,7 @@ public class IOManagerAsyncTest {
             final BlockChannelWriter<MemorySegment> writer =
                     this.ioManager.createBlockChannelWriter(channelID);
 
-            MemorySegment memSeg = MemorySegmentFactory.allocateUnpooledSegment(32 * 1024);
+            MemorySegment memSeg = MemorySegmentFactory.allocateHeapSegment(32 * 1024);
 
             for (int i = 0; i < NUM_IOS; i++) {
                 for (int pos = 0; pos < memSeg.size(); pos += 4) {
@@ -105,7 +105,7 @@ public class IOManagerAsyncTest {
         try {
             final List<MemorySegment> memSegs = new ArrayList<MemorySegment>();
             for (int i = 0; i < NUM_SEGS; i++) {
-                memSegs.add(MemorySegmentFactory.allocateUnpooledSegment(32 * 1024));
+                memSegs.add(MemorySegmentFactory.allocateHeapSegment(32 * 1024));
             }
 
             final FileIOChannel.ID channelID = this.ioManager.createChannel();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
index 6fc42feb..9e72a59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/PagedViewsTest.java
@@ -380,7 +380,7 @@ public class PagedViewsTest {
         private final int segmentSize;
 
         private TestOutputView(int segmentSize) {
-            super(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), segmentSize, 0);
+            super(MemorySegmentFactory.allocateHeapSegment(segmentSize), segmentSize, 0);
 
             this.segmentSize = segmentSize;
         }
@@ -389,7 +389,7 @@ public class PagedViewsTest {
         protected MemorySegment nextSegment(MemorySegment current, int positionInCurrent)
                 throws IOException {
             segments.add(new SegmentWithPosition(current, positionInCurrent));
-            return MemorySegmentFactory.allocateUnpooledSegment(segmentSize);
+            return MemorySegmentFactory.allocateHeapSegment(segmentSize);
         }
 
         public void close() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index 01910b5..d2b2f48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -295,7 +295,7 @@ public class SpanningRecordSerializationTest extends TestLogger {
     private static Buffer appendLeftOverBytes(Buffer buffer, byte[] leftOverBytes) {
         BufferBuilder bufferBuilder =
                 new BufferBuilder(
-                        MemorySegmentFactory.allocateUnpooledSegment(
+                        MemorySegmentFactory.allocateHeapSegment(
                                 buffer.readableBytes() + leftOverBytes.length),
                         FreeingBufferRecycler.INSTANCE);
         try (BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
index 46719d1..3803a5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningWrapperTest.java
@@ -30,7 +30,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
-import static org.apache.flink.core.memory.MemorySegmentFactory.wrap;
+import static org.apache.flink.core.memory.MemorySegmentFactory.wrapHeapSegment;
 import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
 import static org.junit.Assert.assertArrayEquals;
 
@@ -64,8 +64,8 @@ public class SpanningWrapperTest {
                         recordLen);
         spanningWrapper.transferFrom(wrapNonSpanning(record1, firstChunk), recordLen);
         spanningWrapper.addNextChunkFromMemorySegment(
-                wrap(record1), firstChunk, recordLen - firstChunk + LENGTH_BYTES);
-        spanningWrapper.addNextChunkFromMemorySegment(wrap(record2), 0, record2.length);
+                wrapHeapSegment(record1), firstChunk, recordLen - firstChunk + LENGTH_BYTES);
+        spanningWrapper.addNextChunkFromMemorySegment(wrapHeapSegment(record2), 0, record2.length);
 
         CloseableIterator<Buffer> unconsumedSegment = spanningWrapper.getUnconsumedSegment();
 
@@ -92,7 +92,7 @@ public class SpanningWrapperTest {
 
     private NonSpanningWrapper wrapNonSpanning(byte[] bytes, int len) {
         NonSpanningWrapper nonSpanningWrapper = new NonSpanningWrapper();
-        MemorySegment segment = wrap(bytes);
+        MemorySegment segment = wrapHeapSegment(bytes);
         nonSpanningWrapper.initializeFromMemorySegment(segment, 0, len);
         nonSpanningWrapper
                 .readInt(); // emulate read length performed in getNextRecord to move position
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 6974026..6c86f1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -50,7 +50,7 @@ public abstract class AbstractCollectingResultPartitionWriter extends MockResult
     private void deserializeRecord(ByteBuffer serializedRecord) throws IOException {
         checkArgument(serializedRecord.hasArray());
 
-        MemorySegment segment = MemorySegmentFactory.wrap(serializedRecord.array());
+        MemorySegment segment = MemorySegmentFactory.wrapHeapSegment(serializedRecord.array());
         NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
         buffer.setSize(serializedRecord.remaining());
         deserializeBuffer(buffer);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
index c6db226..75343d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
@@ -283,7 +283,7 @@ public class BufferBuilderAndConsumerTest {
 
     private static BufferBuilder createBufferBuilder() {
         return new BufferBuilder(
-                MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE),
+                MemorySegmentFactory.allocateHeapSegment(BUFFER_SIZE),
                 FreeingBufferRecycler.INSTANCE);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
index f08e128..ca628d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -43,7 +43,7 @@ public class BufferBuilderTestUtils {
         checkArgument(size >= dataSize);
         BufferBuilder bufferBuilder =
                 new BufferBuilder(
-                        MemorySegmentFactory.allocateUnpooledSegment(size),
+                        MemorySegmentFactory.allocateHeapSegment(size),
                         FreeingBufferRecycler.INSTANCE);
         return fillBufferBuilder(bufferBuilder, dataSize);
     }
@@ -90,13 +90,13 @@ public class BufferBuilderTestUtils {
 
     public static BufferConsumer createEventBufferConsumer(int size, Buffer.DataType dataType) {
         return new BufferConsumer(
-                MemorySegmentFactory.allocateUnpooledSegment(size),
+                MemorySegmentFactory.allocateHeapSegment(size),
                 FreeingBufferRecycler.INSTANCE,
                 dataType);
     }
 
     public static Buffer buildBufferWithAscendingInts(int bufferSize, int numInts, int nextValue) {
-        final MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
+        final MemorySegment seg = MemorySegmentFactory.allocateHeapSegment(bufferSize);
         for (int i = 0; i < numInts; i++) {
             seg.putIntLittleEndian(4 * i, nextValue++);
         }
@@ -118,13 +118,13 @@ public class BufferBuilderTestUtils {
     }
 
     public static Buffer buildSomeBuffer(int size) {
-        final MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(size);
+        final MemorySegment seg = MemorySegmentFactory.allocateHeapSegment(size);
         return new NetworkBuffer(seg, MemorySegment::free, Buffer.DataType.DATA_BUFFER, size);
     }
 
     public static BufferBuilder createEmptyBufferBuilder(int bufferSize) {
         return new BufferBuilder(
-                MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
+                MemorySegmentFactory.allocateHeapSegment(bufferSize),
                 FreeingBufferRecycler.INSTANCE);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferCompressionTest.java
index c7dbe04..c787f48 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferCompressionTest.java
@@ -193,9 +193,9 @@ public class BufferCompressionTest {
     private static Buffer createBufferAndFillWithLongValues(boolean isDirect) {
         MemorySegment segment;
         if (isDirect) {
-            segment = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
+            segment = MemorySegmentFactory.allocateHeapSegment(BUFFER_SIZE);
         } else {
-            segment = MemorySegmentFactory.allocateUnpooledOffHeapMemory(BUFFER_SIZE);
+            segment = MemorySegmentFactory.allocateDirectSegment(BUFFER_SIZE);
         }
         for (int i = 0; i < NUM_LONGS; ++i) {
             segment.putLongLittleEndian(8 * i, i);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferConsumerWithPartialRecordLengthTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferConsumerWithPartialRecordLengthTest.java
index 7122d30..1101882 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferConsumerWithPartialRecordLengthTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferConsumerWithPartialRecordLengthTest.java
@@ -148,7 +148,7 @@ public class BufferConsumerWithPartialRecordLengthTest {
 
     private BufferBuilder createBufferBuilder() {
         return new BufferBuilder(
-                MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE),
+                MemorySegmentFactory.allocateHeapSegment(BUFFER_SIZE),
                 FreeingBufferRecycler.INSTANCE);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java
index 6030a4f..0f04760 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java
@@ -75,7 +75,7 @@ public class NetworkBufferTest extends AbstractByteBufTest {
     private static NetworkBuffer newBuffer(
             int length, int maxCapacity, boolean isBuffer, BufferRecycler recycler) {
         final MemorySegment segment =
-                MemorySegmentFactory.allocateUnpooledSegment(
+                MemorySegmentFactory.allocateHeapSegment(
                         Math.min(maxCapacity, MAX_CAPACITY_UPPER_BOUND));
 
         Buffer.DataType dataType =
@@ -127,7 +127,7 @@ public class NetworkBufferTest extends AbstractByteBufTest {
     }
 
     private static void testGetMemorySegment(boolean isBuffer) {
-        final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
+        final MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(1024);
         Buffer.DataType dataType =
                 isBuffer ? Buffer.DataType.DATA_BUFFER : Buffer.DataType.EVENT_BUFFER;
         NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, dataType);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
index ff66ca7..73c1c3c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
@@ -48,7 +48,7 @@ public class ReadOnlySlicedBufferTest {
 
     @Before
     public void setUp() throws Exception {
-        final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
+        final MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(BUFFER_SIZE);
         buffer =
                 new NetworkBuffer(
                         segment, FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, 0);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
index 67f9301..9d67b3a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientDecoderDelegateTest.java
@@ -238,7 +238,7 @@ public class NettyMessageClientDecoderDelegateTest extends TestLogger {
     }
 
     private Buffer createDataBuffer(int size, Buffer.DataType dataType) {
-        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(size);
+        MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(size);
         NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, dataType);
         for (int i = 0; i < size / 4; ++i) {
             buffer.writeInt(i);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
index 77de8359..8584e53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
@@ -159,7 +159,7 @@ public class NettyMessageClientSideSerializationTest extends TestLogger {
 
         NetworkBuffer buffer =
                 new NetworkBuffer(
-                        MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE),
+                        MemorySegmentFactory.allocateHeapSegment(BUFFER_SIZE),
                         FreeingBufferRecycler.INSTANCE);
         for (int i = 0; i < BUFFER_SIZE; i += 8) {
             buffer.writeLong(i);
@@ -204,7 +204,7 @@ public class NettyMessageClientSideSerializationTest extends TestLogger {
     }
 
     private Buffer decompress(Buffer buffer) {
-        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
+        MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(BUFFER_SIZE);
         Buffer compressedBuffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
         buffer.asByteBuf().readBytes(compressedBuffer.asByteBuf(), buffer.readableBytes());
         compressedBuffer.setCompressed(true);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
index 37dad72..3cd8e0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionWriteReadTest.java
@@ -221,7 +221,7 @@ public class BoundedBlockingSubpartitionWriteReadTest {
 
     private static void writeLongs(BoundedBlockingSubpartition partition, long nums)
             throws IOException {
-        final MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
+        final MemorySegment memory = MemorySegmentFactory.allocateHeapSegment(BUFFER_SIZE);
 
         long l = 0;
         while (nums > 0) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtilTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtilTest.java
index ce27237..f6e7d2f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtilTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BufferReaderWriterUtilTest.java
@@ -111,7 +111,7 @@ public class BufferReaderWriterUtilTest {
         final FileChannel fc = tmpFileChannel();
         final Buffer buffer = createTestBuffer();
         final MemorySegment readBuffer =
-                MemorySegmentFactory.allocateUnpooledOffHeapMemory(buffer.getSize(), null);
+                MemorySegmentFactory.allocateDirectSegment(buffer.getSize(), null);
 
         BufferReaderWriterUtil.writeToByteChannel(
                 fc, buffer, BufferReaderWriterUtil.allocatedWriteBufferArray());
@@ -132,7 +132,7 @@ public class BufferReaderWriterUtilTest {
         final FileChannel fc = tmpFileChannel();
         final Buffer buffer = createTestBuffer();
         final MemorySegment readBuffer =
-                MemorySegmentFactory.allocateUnpooledOffHeapMemory(buffer.getSize(), null);
+                MemorySegmentFactory.allocateDirectSegment(buffer.getSize(), null);
 
         BufferReaderWriterUtil.writeToByteChannel(
                 fc, buffer, BufferReaderWriterUtil.allocatedWriteBufferArray());
@@ -156,7 +156,7 @@ public class BufferReaderWriterUtilTest {
         final FileChannel fc = tmpFileChannel();
         final Buffer buffer = createTestBuffer();
         final MemorySegment readBuffer =
-                MemorySegmentFactory.allocateUnpooledOffHeapMemory(buffer.getSize(), null);
+                MemorySegmentFactory.allocateDirectSegment(buffer.getSize(), null);
 
         BufferReaderWriterUtil.writeToByteChannel(
                 fc, buffer, BufferReaderWriterUtil.allocatedWriteBufferArray());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 17d8336..355dde8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -236,8 +236,7 @@ public class InputChannelTestUtils {
 
         @Override
         public Collection<MemorySegment> requestMemorySegments(int numberOfSegmentsToRequest) {
-            return Collections.singletonList(
-                    MemorySegmentFactory.allocateUnpooledSegment(pageSize));
+            return Collections.singletonList(MemorySegmentFactory.allocateHeapSegment(pageSize));
         }
 
         @Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBufferTest.java
index ff3bc43..f29ddec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionSortedBufferTest.java
@@ -98,7 +98,7 @@ public class PartitionSortedBufferTest {
 
         // read all data from the sort buffer
         while (sortBuffer.hasRemaining()) {
-            MemorySegment readBuffer = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
+            MemorySegment readBuffer = MemorySegmentFactory.allocateHeapSegment(bufferSize);
             SortBuffer.BufferWithChannel bufferAndChannel = sortBuffer.copyIntoSegment(readBuffer);
             int subpartition = bufferAndChannel.getChannelIndex();
             buffersRead[subpartition].add(bufferAndChannel.getBuffer());
@@ -191,7 +191,7 @@ public class PartitionSortedBufferTest {
 
     private void checkReadResult(
             SortBuffer sortBuffer, ByteBuffer expectedBuffer, int expectedChannel, int bufferSize) {
-        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
+        MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(bufferSize);
         SortBuffer.BufferWithChannel bufferWithChannel = sortBuffer.copyIntoSegment(segment);
         assertEquals(expectedChannel, bufferWithChannel.getChannelIndex());
         assertEquals(expectedBuffer, bufferWithChannel.getBuffer().getNioBufferReadable());
@@ -280,7 +280,7 @@ public class PartitionSortedBufferTest {
         sortBuffer.append(ByteBuffer.allocate(1), 0, Buffer.DataType.DATA_BUFFER);
 
         assertTrue(sortBuffer.hasRemaining());
-        sortBuffer.copyIntoSegment(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
+        sortBuffer.copyIntoSegment(MemorySegmentFactory.allocateHeapSegment(bufferSize));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -295,7 +295,7 @@ public class PartitionSortedBufferTest {
         sortBuffer.release();
         assertFalse(sortBuffer.hasRemaining());
 
-        sortBuffer.copyIntoSegment(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
+        sortBuffer.copyIntoSegment(MemorySegmentFactory.allocateHeapSegment(bufferSize));
     }
 
     @Test(expected = IllegalStateException.class)
@@ -306,7 +306,7 @@ public class PartitionSortedBufferTest {
         sortBuffer.finish();
 
         assertFalse(sortBuffer.hasRemaining());
-        sortBuffer.copyIntoSegment(MemorySegmentFactory.allocateUnpooledSegment(bufferSize));
+        sortBuffer.copyIntoSegment(MemorySegmentFactory.allocateHeapSegment(bufferSize));
     }
 
     @Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
index 4c4ab33..040f0b3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionedFileWriteReadTest.java
@@ -92,7 +92,7 @@ public class PartitionedFileWriteReadTest {
             PartitionedFileReader fileReader =
                     new PartitionedFileReader(partitionedFile, subpartition);
             while (fileReader.hasRemaining()) {
-                MemorySegment readBuffer = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
+                MemorySegment readBuffer = MemorySegmentFactory.allocateHeapSegment(bufferSize);
                 Buffer buffer = fileReader.readBuffer(readBuffer, (buf) -> {});
                 buffersRead[subpartition].add(buffer);
             }
@@ -137,7 +137,7 @@ public class PartitionedFileWriteReadTest {
             PartitionedFileReader fileReader =
                     new PartitionedFileReader(partitionedFile, subpartition);
             while (fileReader.hasRemaining()) {
-                MemorySegment readBuffer = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
+                MemorySegment readBuffer = MemorySegmentFactory.allocateHeapSegment(bufferSize);
                 Buffer buffer = checkNotNull(fileReader.readBuffer(readBuffer, (buf) -> {}));
                 assertBufferEquals(checkNotNull(subpartitionBuffers[subpartition].poll()), buffer);
             }
@@ -158,14 +158,15 @@ public class PartitionedFileWriteReadTest {
 
         int dataSize = random.nextInt(bufferSize) + 1;
         byte[] data = new byte[dataSize];
-        return new NetworkBuffer(MemorySegmentFactory.wrap(data), (buf) -> {}, dataType, dataSize);
+        return new NetworkBuffer(
+                MemorySegmentFactory.wrapHeapSegment(data), (buf) -> {}, dataType, dataSize);
     }
 
     @Test(expected = IllegalStateException.class)
     public void testNotWriteDataOfTheSameSubpartitionTogether() throws Exception {
         PartitionedFileWriter partitionedFileWriter = createPartitionedFileWriter(2);
         try {
-            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
+            MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(1024);
 
             NetworkBuffer buffer1 = new NetworkBuffer(segment, (buf) -> {});
             partitionedFileWriter.writeBuffer(buffer1, 1);
@@ -184,7 +185,7 @@ public class PartitionedFileWriteReadTest {
     public void testWriteFinishedPartitionedFile() throws Exception {
         PartitionedFileWriter partitionedFileWriter = createAndFinishPartitionedFileWriter();
 
-        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
+        MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(1024);
         NetworkBuffer buffer = new NetworkBuffer(segment, (buf) -> {});
 
         partitionedFileWriter.writeBuffer(buffer, 0);
@@ -200,14 +201,14 @@ public class PartitionedFileWriteReadTest {
     public void testReadClosedPartitionedFile() throws Exception {
         PartitionedFileReader partitionedFileReader = createAndClosePartitionedFiledReader();
 
-        MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(1024);
+        MemorySegment target = MemorySegmentFactory.allocateHeapSegment(1024);
         partitionedFileReader.readBuffer(target, FreeingBufferRecycler.INSTANCE);
     }
 
     @Test
     public void testReadEmptyPartitionedFile() throws Exception {
         try (PartitionedFileReader partitionedFileReader = createPartitionedFiledReader()) {
-            MemorySegment target = MemorySegmentFactory.allocateUnpooledSegment(1024);
+            MemorySegment target = MemorySegmentFactory.allocateHeapSegment(1024);
             assertNull(partitionedFileReader.readBuffer(target, FreeingBufferRecycler.INSTANCE));
         }
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 8f5fee5..aeeaaef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -150,7 +150,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
                         }
 
                         MemorySegment segment =
-                                MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
+                                MemorySegmentFactory.allocateHeapSegment(bufferSize);
 
                         int next = numberOfBuffers * (bufferSize / Integer.BYTES);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
index 74dc627..7aebf70 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionTest.java
@@ -138,7 +138,7 @@ public class SortMergeResultPartitionTest {
                 int numBytes = buffer.readableBytes();
                 numBytesRead[subpartition] += numBytes;
 
-                MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(numBytes);
+                MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(numBytes);
                 segment.put(0, buffer.getNioBufferReadable(), numBytes);
                 buffersRead[subpartition].add(
                         new NetworkBuffer(segment, (buf) -> {}, buffer.getDataType(), numBytes));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 75d694a..1467d14 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -255,7 +255,7 @@ public class SingleInputGateTest extends InputGateTestBase {
                 new SingleInputGateBuilder().setBufferDecompressor(decompressor).build()) {
             TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
 
-            MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
+            MemorySegment segment = MemorySegmentFactory.allocateHeapSegment(bufferSize);
             for (int i = 0; i < bufferSize; i += 8) {
                 segment.putLongLittleEndian(i, i);
             }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index e1877bf..7855693 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -58,7 +58,7 @@ public class TestBufferFactory {
 
         numberOfCreatedBuffers++;
         return new NetworkBuffer(
-                MemorySegmentFactory.allocateUnpooledSegment(bufferSize), bufferRecycler);
+                MemorySegmentFactory.allocateHeapSegment(bufferSize), bufferRecycler);
     }
 
     public synchronized int getNumberOfCreatedBuffers() {
@@ -89,7 +89,7 @@ public class TestBufferFactory {
      */
     public static Buffer createBuffer(int bufferSize, int dataSize) {
         return new NetworkBuffer(
-                MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
+                MemorySegmentFactory.allocateHeapSegment(bufferSize),
                 RECYCLER,
                 Buffer.DataType.DATA_BUFFER,
                 dataSize);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
index 2b8808a..48560c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
@@ -74,7 +74,8 @@ public class TestSubpartitionProducer implements Callable<Boolean> {
             BufferAndChannel bufferAndChannel;
 
             while ((bufferAndChannel = source.getNextBuffer()) != null) {
-                MemorySegment segment = MemorySegmentFactory.wrap(bufferAndChannel.getBuffer());
+                MemorySegment segment =
+                        MemorySegmentFactory.wrapHeapSegment(bufferAndChannel.getBuffer());
                 subpartition.add(
                         new BufferConsumer(
                                 segment, MemorySegment::free, Buffer.DataType.DATA_BUFFER));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
index 8301a35..7edcd60 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/memory/MemorySegmentSimpleTest.java
@@ -553,7 +553,7 @@ public class MemorySegmentSimpleTest {
     @Test
     public void testByteBufferWrapping() {
         try {
-            MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(1024);
+            MemorySegment seg = MemorySegmentFactory.allocateHeapSegment(1024);
 
             ByteBuffer buf1 = seg.wrap(13, 47);
             assertEquals(13, buf1.position());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
index 7e7c9f0..f2ba6ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/CompactingHashTableTest.java
@@ -521,7 +521,7 @@ public class CompactingHashTableTest extends MutableHashTableTestBase {
     private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
         ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
         for (int i = 0; i < numSegments; i++) {
-            list.add(MemorySegmentFactory.allocateUnpooledSegment(segmentSize));
+            list.add(MemorySegmentFactory.allocateHeapSegment(segmentSize));
         }
         return list;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
index c3b9a78..9873328 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTablePerformanceComparison.java
@@ -308,7 +308,7 @@ public class HashTablePerformanceComparison {
         List<MemorySegment> memory = new ArrayList<MemorySegment>();
 
         for (int i = 0; i < numPages; i++) {
-            memory.add(MemorySegmentFactory.allocateUnpooledSegment(pageSize));
+            memory.add(MemorySegmentFactory.allocateHeapSegment(pageSize));
         }
 
         return memory;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
index 40b1da2..179ff78 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java
@@ -274,7 +274,7 @@ public class HashTableTest {
     private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
         ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
         for (int i = 0; i < numSegments; i++) {
-            list.add(MemorySegmentFactory.allocateUnpooledSegment(segmentSize));
+            list.add(MemorySegmentFactory.allocateHeapSegment(segmentSize));
         }
         return list;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTableTest.java
index d5d7b51..ed44ca6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/InPlaceMutableHashTableTest.java
@@ -489,7 +489,7 @@ public class InPlaceMutableHashTableTest extends MutableHashTableTestBase {
         List<MemorySegment> memory = new ArrayList<>();
 
         for (int i = 0; i < numPages; i++) {
-            memory.add(MemorySegmentFactory.allocateUnpooledSegment(pageSize));
+            memory.add(MemorySegmentFactory.allocateHeapSegment(pageSize));
         }
 
         return memory;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTableTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTableTestBase.java
index cf3b8c4..606e4d2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTableTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTableTestBase.java
@@ -521,7 +521,7 @@ public abstract class MutableHashTableTestBase {
     private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
         ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
         for (int i = 0; i < numSegments; i++) {
-            list.add(MemorySegmentFactory.allocateUnpooledSegment(segmentSize));
+            list.add(MemorySegmentFactory.allocateHeapSegment(segmentSize));
         }
         return list;
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
index 4323aaf..2427fca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java
@@ -38,7 +38,7 @@ public class BitSetTest {
 
     public BitSetTest(int byteSize) {
         this.byteSize = byteSize;
-        memorySegment = MemorySegmentFactory.allocateUnpooledSegment(byteSize);
+        memorySegment = MemorySegmentFactory.allocateHeapSegment(byteSize);
     }
 
     @Before
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
index a2e185d..e280b12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BloomFilterTest.java
@@ -39,7 +39,7 @@ public class BloomFilterTest {
         int bitsSize = BloomFilter.optimalNumOfBits(INPUT_SIZE, FALSE_POSITIVE_PROBABILITY);
         bitsSize = bitsSize + (Long.SIZE - (bitsSize % Long.SIZE));
         int byteSize = bitsSize >>> 3;
-        MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(byteSize);
+        MemorySegment memorySegment = MemorySegmentFactory.allocateHeapSegment(byteSize);
         bloomFilter = new BloomFilter(INPUT_SIZE, byteSize);
         bloomFilter.setBitsLocation(memorySegment, 0);
     }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index dc0e934..50ca53f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
@@ -300,7 +300,7 @@ public class ChannelPersistenceITCase {
     private static Buffer wrapWithBuffer(byte[] data) {
         NetworkBuffer buffer =
                 new NetworkBuffer(
-                        HeapMemorySegment.FACTORY.allocateUnpooledSegment(data.length, null),
+                        MemorySegmentFactory.allocateHeapSegment(data.length, null),
                         FreeingBufferRecycler.INSTANCE);
         buffer.writeBytes(data);
         return buffer;
diff --git a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
index 318d205..7a9e9b1 100644
--- a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
+++ b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMap.java
@@ -1087,7 +1087,7 @@ public final class CopyOnWriteSkipListStateMap<K, N, S> extends StateMap<K, N, S
         int offsetInSegment = node.nodeOffset;
 
         int valueLen = SkipListUtils.getValueLen(segment, offsetInSegment);
-        MemorySegment valueSegment = MemorySegmentFactory.allocateUnpooledSegment(valueLen);
+        MemorySegment valueSegment = MemorySegmentFactory.allocateHeapSegment(valueLen);
         segment.copyTo(
                 offsetInSegment + SkipListUtils.getValueMetaLen(), valueSegment, 0, valueLen);
 
@@ -1465,7 +1465,7 @@ public final class CopyOnWriteSkipListStateMap<K, N, S> extends StateMap<K, N, S
             int keyLen = SkipListUtils.getKeyLen(segment, offsetInSegment);
             int keyDataOffset = offsetInSegment + SkipListUtils.getKeyDataOffset(level);
 
-            MemorySegment nextKeySegment = MemorySegmentFactory.allocateUnpooledSegment(keyLen);
+            MemorySegment nextKeySegment = MemorySegmentFactory.allocateHeapSegment(keyLen);
             segment.copyTo(keyDataOffset, nextKeySegment, 0, keyLen);
             this.nextKeySegment = nextKeySegment;
             nextKeyOffset = 0;
diff --git a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java
index 7dbeb65..3908ace 100644
--- a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java
+++ b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListKeySerializer.java
@@ -84,7 +84,7 @@ class SkipListKeySerializer<K, N> {
         }
 
         final byte[] result = outputStream.toByteArray();
-        final MemorySegment segment = MemorySegmentFactory.wrap(result);
+        final MemorySegment segment = MemorySegmentFactory.wrapHeapSegment(result);
 
         // set length of namespace and key
         segment.putInt(0, keyStartPos - Integer.BYTES);
@@ -142,13 +142,13 @@ class SkipListKeySerializer<K, N> {
     Tuple2<byte[], byte[]> getSerializedKeyAndNamespace(MemorySegment memorySegment, int offset) {
         // read namespace
         int namespaceLen = memorySegment.getInt(offset);
-        MemorySegment namespaceSegment = MemorySegmentFactory.allocateUnpooledSegment(namespaceLen);
+        MemorySegment namespaceSegment = MemorySegmentFactory.allocateHeapSegment(namespaceLen);
         memorySegment.copyTo(offset + Integer.BYTES, namespaceSegment, 0, namespaceLen);
 
         // read key
         int keyOffset = offset + Integer.BYTES + namespaceLen;
         int keyLen = memorySegment.getInt(keyOffset);
-        MemorySegment keySegment = MemorySegmentFactory.allocateUnpooledSegment(keyLen);
+        MemorySegment keySegment = MemorySegmentFactory.allocateHeapSegment(keyLen);
         memorySegment.copyTo(keyOffset + Integer.BYTES, keySegment, 0, keyLen);
 
         return Tuple2.of(keySegment.getArray(), namespaceSegment.getArray());
@@ -166,6 +166,6 @@ class SkipListKeySerializer<K, N> {
     }
 
     MemorySegment serializeNamespaceToSegment(N namespace) {
-        return MemorySegmentFactory.wrap(serializeNamespace(namespace));
+        return MemorySegmentFactory.wrapHeapSegment(serializeNamespace(namespace));
     }
 }
diff --git a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListUtils.java b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListUtils.java
index f492c41..419558b 100644
--- a/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListUtils.java
+++ b/flink-state-backends/flink-statebackend-heap-spillable/src/main/java/org/apache/flink/runtime/state/heap/SkipListUtils.java
@@ -385,7 +385,7 @@ public class SkipListUtils {
      * @param value value data.
      */
     public static void putValueData(MemorySegment memorySegment, int offset, byte[] value) {
-        MemorySegment valueSegment = MemorySegmentFactory.wrap(value);
+        MemorySegment valueSegment = MemorySegmentFactory.wrapHeapSegment(value);
         valueSegment.copyTo(0, memorySegment, offset + getValueMetaLen(), value.length);
     }
 
diff --git a/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapBasicOpTest.java b/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapBasicOpTest.java
index 46a71df..17a7e90 100644
--- a/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapBasicOpTest.java
+++ b/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/CopyOnWriteSkipListStateMapBasicOpTest.java
@@ -318,7 +318,7 @@ public class CopyOnWriteSkipListStateMapBasicOpTest extends TestLogger {
         byte[] keyBytes = skipListKeySerializer.serialize(key, namespace);
         byte[] constructedKeyBytes = new byte[keyBytes.length + 1];
         System.arraycopy(keyBytes, 0, constructedKeyBytes, 1, keyBytes.length);
-        MemorySegment keySegment = MemorySegmentFactory.wrap(constructedKeyBytes);
+        MemorySegment keySegment = MemorySegmentFactory.wrapHeapSegment(constructedKeyBytes);
         int keyLen = keyBytes.length;
         byte[] value = skipListValueSerializer.serialize(valueString);
         stateMap.putValue(keySegment, 1, keyLen, value, false);
@@ -479,7 +479,7 @@ public class CopyOnWriteSkipListStateMapBasicOpTest extends TestLogger {
         SkipListKeySerializer<Integer, Long> skipListKeySerializer =
                 new SkipListKeySerializer<>(IntSerializer.INSTANCE, LongSerializer.INSTANCE);
         byte[] namespaceBytes = skipListKeySerializer.serializeNamespace(namespace);
-        MemorySegment namespaceSegment = MemorySegmentFactory.wrap(namespaceBytes);
+        MemorySegment namespaceSegment = MemorySegmentFactory.wrapHeapSegment(namespaceBytes);
         Iterator<Long> iterator =
                 stateMap.new NamespaceNodeIterator(namespaceSegment, 0, namespaceBytes.length);
         while (iterator.hasNext()) {
diff --git a/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListKeyComparatorTest.java b/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListKeyComparatorTest.java
index 6c460e9..0534436 100644
--- a/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListKeyComparatorTest.java
+++ b/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListKeyComparatorTest.java
@@ -175,8 +175,10 @@ public class SkipListKeyComparatorTest extends TestLogger {
             N namespace1,
             K key2,
             N namespace2) {
-        MemorySegment b1 = MemorySegmentFactory.wrap(keySerializer.serialize(key1, namespace1));
-        MemorySegment b2 = MemorySegmentFactory.wrap(keySerializer.serialize(key2, namespace2));
+        MemorySegment b1 =
+                MemorySegmentFactory.wrapHeapSegment(keySerializer.serialize(key1, namespace1));
+        MemorySegment b2 =
+                MemorySegmentFactory.wrapHeapSegment(keySerializer.serialize(key2, namespace2));
         return SkipListKeyComparator.compareTo(b1, 0, b2, 0);
     }
 
@@ -189,7 +191,11 @@ public class SkipListKeyComparatorTest extends TestLogger {
                 skipListKeySerializerForNamespaceCompare.serialize(
                         key, convertStringToByteArray(targetNamespace));
         return SkipListKeyComparator.compareNamespaceAndNode(
-                MemorySegmentFactory.wrap(n), 0, n.length, MemorySegmentFactory.wrap(k), 0);
+                MemorySegmentFactory.wrapHeapSegment(n),
+                0,
+                n.length,
+                MemorySegmentFactory.wrapHeapSegment(k),
+                0);
     }
 
     private byte[] convertStringToByteArray(@Nonnull String str) {
diff --git a/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListSerializerTest.java b/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListSerializerTest.java
index aff4eff..c6a93f3 100644
--- a/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListSerializerTest.java
+++ b/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListSerializerTest.java
@@ -65,7 +65,7 @@ public class SkipListSerializerTest extends TestLogger {
         int offset = 10;
         byte[] data = new byte[10 + skipListKey.length];
         System.arraycopy(skipListKey, 0, data, offset, skipListKey.length);
-        MemorySegment skipListKeySegment = MemorySegmentFactory.wrap(data);
+        MemorySegment skipListKeySegment = MemorySegmentFactory.wrapHeapSegment(data);
         assertEquals(
                 key,
                 skipListKeySerializer.deserializeKey(
@@ -106,7 +106,7 @@ public class SkipListSerializerTest extends TestLogger {
         assertEquals(
                 state,
                 skipListValueSerializer.deserializeState(
-                        MemorySegmentFactory.wrap(data), offset, value.length));
+                        MemorySegmentFactory.wrapHeapSegment(data), offset, value.length));
     }
 
     private <T> T deserialize(TypeSerializer<T> serializer, byte[] data) throws IOException {
diff --git a/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListUtilsTest.java b/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListUtilsTest.java
index 679fec0..96acb37 100644
--- a/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListUtilsTest.java
+++ b/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/SkipListUtilsTest.java
@@ -43,7 +43,7 @@ public class SkipListUtilsTest extends TestLogger {
             int totalKeySpaceLen = keyMetaLen + keyLen;
             int offset = 100;
             MemorySegment segment =
-                    MemorySegmentFactory.allocateUnpooledSegment(totalKeySpaceLen + offset);
+                    MemorySegmentFactory.allocateHeapSegment(totalKeySpaceLen + offset);
             putKeySpace(keySpace, segment, offset);
             verifyGetKeySpace(keySpace, segment, offset);
         }
@@ -58,7 +58,7 @@ public class SkipListUtilsTest extends TestLogger {
             int totalValueSpaceLen = valueMetaLen + valueLen;
             int offset = 100;
             MemorySegment segment =
-                    MemorySegmentFactory.allocateUnpooledSegment(totalValueSpaceLen + offset);
+                    MemorySegmentFactory.allocateHeapSegment(totalValueSpaceLen + offset);
             putValueSpace(valueSpace, segment, offset);
             verifyGetValueSpace(valueSpace, segment, offset);
         }
@@ -98,7 +98,7 @@ public class SkipListUtilsTest extends TestLogger {
         SkipListUtils.putKeyData(
                 memorySegment,
                 offset,
-                MemorySegmentFactory.wrap(keySpace.keyData),
+                MemorySegmentFactory.wrapHeapSegment(keySpace.keyData),
                 0,
                 keySpace.keyData.length,
                 keySpace.level);
@@ -122,7 +122,7 @@ public class SkipListUtilsTest extends TestLogger {
                     SkipListUtils.getPrevIndexNode(memorySegment, offset, keySpace.level, i));
         }
         int keyDataOffset = SkipListUtils.getKeyDataOffset(keySpace.level);
-        MemorySegment keyDataSegment = MemorySegmentFactory.wrap(keySpace.keyData);
+        MemorySegment keyDataSegment = MemorySegmentFactory.wrapHeapSegment(keySpace.keyData);
         assertEquals(
                 0,
                 memorySegment.compare(
@@ -157,7 +157,7 @@ public class SkipListUtilsTest extends TestLogger {
                 SkipListUtils.getNextValuePointer(memorySegment, offset));
         assertEquals(valueSpace.valueData.length, SkipListUtils.getValueLen(memorySegment, offset));
         int valueDataOffset = SkipListUtils.getValueMetaLen();
-        MemorySegment valueDataSegment = MemorySegmentFactory.wrap(valueSpace.valueData);
+        MemorySegment valueDataSegment = MemorySegmentFactory.wrapHeapSegment(valueSpace.valueData);
         assertEquals(
                 0,
                 memorySegment.compare(
diff --git a/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/TestAllocator.java b/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/TestAllocator.java
index fbfd464..db3264a 100644
--- a/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/TestAllocator.java
+++ b/flink-state-backends/flink-statebackend-heap-spillable/src/test/java/org/apache/flink/runtime/state/heap/TestAllocator.java
@@ -121,7 +121,7 @@ public class TestAllocator extends TestLogger implements Allocator {
             this.offset = 14;
             this.chunkId = chunkId;
             this.size = size + offset;
-            this.segment = MemorySegmentFactory.allocateUnpooledSegment(size);
+            this.segment = MemorySegmentFactory.allocateHeapSegment(size);
         }
 
         @Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java
index f777aa6..64a8107 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/checkpointing/CheckpointBarrierTrackerTest.java
@@ -625,7 +625,7 @@ public class CheckpointBarrierTrackerTest {
     private static BufferOrEvent createBuffer(int channel) {
         return new BufferOrEvent(
                 new NetworkBuffer(
-                        MemorySegmentFactory.wrap(new byte[] {1, 2}),
+                        MemorySegmentFactory.wrapHeapSegment(new byte[] {1, 2}),
                         FreeingBufferRecycler.INSTANCE),
                 new InputChannelInfo(0, channel));
     }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java
index 476a89a..3b3abdc 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryArrayData.java
@@ -518,7 +518,7 @@ public final class BinaryArrayData extends BinarySection implements ArrayData, T
 
     public BinaryArrayData copy(BinaryArrayData reuse) {
         byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
-        reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+        reuse.pointTo(MemorySegmentFactory.wrapHeapSegment(bytes), 0, sizeInBytes);
         return reuse;
     }
 
@@ -579,7 +579,7 @@ public final class BinaryArrayData extends BinarySection implements ArrayData, T
                 arr, offset, data, BYTE_ARRAY_BASE_OFFSET + headerInBytes, valueRegionInBytes);
 
         BinaryArrayData result = new BinaryArrayData();
-        result.pointTo(MemorySegmentFactory.wrap(data), 0, (int) totalSize);
+        result.pointTo(MemorySegmentFactory.wrapHeapSegment(data), 0, (int) totalSize);
         return result;
     }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryMapData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryMapData.java
index d8f74d3..f10893b 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryMapData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryMapData.java
@@ -92,7 +92,7 @@ public final class BinaryMapData extends BinarySection implements MapData {
 
     public BinaryMapData copy(BinaryMapData reuse) {
         byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
-        reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+        reuse.pointTo(MemorySegmentFactory.wrapHeapSegment(bytes), 0, sizeInBytes);
         return reuse;
     }
 
@@ -108,7 +108,7 @@ public final class BinaryMapData extends BinarySection implements MapData {
     public static BinaryMapData valueOf(BinaryArrayData key, BinaryArrayData value) {
         checkArgument(key.segments.length == 1 && value.getSegments().length == 1);
         byte[] bytes = new byte[4 + key.sizeInBytes + value.sizeInBytes];
-        MemorySegment segment = MemorySegmentFactory.wrap(bytes);
+        MemorySegment segment = MemorySegmentFactory.wrapHeapSegment(bytes);
         segment.putInt(0, key.sizeInBytes);
         key.getSegments()[0].copyTo(key.getOffset(), segment, 4, key.sizeInBytes);
         value.getSegments()[0].copyTo(
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java
index 5446797..b233585 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRawValueData.java
@@ -112,7 +112,9 @@ public final class BinaryRawValueData<T> extends LazyBinaryFormat<T> implements
         try {
             byte[] bytes = InstantiationUtil.serializeToByteArray(serializer, javaObject);
             return new BinarySection(
-                    new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
+                    new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)},
+                    0,
+                    bytes.length);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -141,6 +143,8 @@ public final class BinaryRawValueData<T> extends LazyBinaryFormat<T> implements
      */
     public static <T> BinaryRawValueData<T> fromBytes(byte[] bytes, int offset, int numBytes) {
         return new BinaryRawValueData<>(
-                new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, offset, numBytes);
+                new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)},
+                offset,
+                numBytes);
     }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java
index 5a7b124..4697da3 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryRowData.java
@@ -420,7 +420,7 @@ public final class BinaryRowData extends BinarySection implements RowData, Typed
 
     private BinaryRowData copyInternal(BinaryRowData reuse) {
         byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
-        reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+        reuse.pointTo(MemorySegmentFactory.wrapHeapSegment(bytes), 0, sizeInBytes);
         return reuse;
     }
 
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
index 3962587..fe754bc 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
@@ -92,7 +92,9 @@ public final class BinaryStringData extends LazyBinaryFormat<String> implements
      */
     public static BinaryStringData fromBytes(byte[] bytes, int offset, int numBytes) {
         return new BinaryStringData(
-                new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, offset, numBytes);
+                new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)},
+                offset,
+                numBytes);
     }
 
     /** Creates a {@link BinaryStringData} instance that contains `length` spaces. */
@@ -350,7 +352,7 @@ public final class BinaryStringData extends LazyBinaryFormat<String> implements
 
         byte[] bytes = StringUtf8Utils.encodeUTF8(javaObject);
         return new BinarySection(
-                new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
+                new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)}, 0, bytes.length);
     }
 
     /** Copy a new {@code BinaryStringData}. */
@@ -360,7 +362,7 @@ public final class BinaryStringData extends LazyBinaryFormat<String> implements
                 BinarySegmentUtils.copyToBytes(
                         binarySection.segments, binarySection.offset, binarySection.sizeInBytes);
         return new BinaryStringData(
-                new MemorySegment[] {MemorySegmentFactory.wrap(copy)},
+                new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(copy)},
                 0,
                 binarySection.sizeInBytes,
                 javaObject);
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/NestedRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/NestedRowData.java
index 5dc75d9..def9b5a 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/NestedRowData.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/NestedRowData.java
@@ -320,7 +320,7 @@ public final class NestedRowData extends BinarySection implements RowData, Typed
 
     private NestedRowData copyInternal(NestedRowData reuse) {
         byte[] bytes = BinarySegmentUtils.copyToBytes(segments, offset, sizeInBytes);
-        reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, sizeInBytes);
+        reuse.pointTo(MemorySegmentFactory.wrapHeapSegment(bytes), 0, sizeInBytes);
         return reuse;
     }
 
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
index 8a26c3e..5fa0382 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/codegen/SortCodeGeneratorTest.java
@@ -457,7 +457,7 @@ public class SortCodeGeneratorTest {
     private void testInner() throws Exception {
         List<MemorySegment> segments = new ArrayList<>();
         for (int i = 0; i < 100; i++) {
-            segments.add(MemorySegmentFactory.wrap(new byte[32768]));
+            segments.add(MemorySegmentFactory.wrapHeapSegment(new byte[32768]));
         }
 
         Tuple2<NormalizedKeyComputer, RecordComparator> tuple2 =
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java
index d066fd9..af6ecd1 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/binary/BinaryRowDataUtil.java
@@ -35,7 +35,7 @@ public class BinaryRowDataUtil {
     static {
         int size = EMPTY_ROW.getFixedLengthPartSize();
         byte[] bytes = new byte[size];
-        EMPTY_ROW.pointTo(MemorySegmentFactory.wrap(bytes), 0, size);
+        EMPTY_ROW.pointTo(MemorySegmentFactory.wrapHeapSegment(bytes), 0, size);
     }
 
     public static boolean byteArrayEquals(byte[] left, byte[] right, int length) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java
index 2f44c19..7d54e06 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/AbstractBinaryWriter.java
@@ -301,7 +301,9 @@ abstract class AbstractBinaryWriter implements BinaryWriter {
         if (newCapacity - minCapacity < 0) {
             newCapacity = minCapacity;
         }
-        segment = MemorySegmentFactory.wrap(Arrays.copyOf(segment.getArray(), newCapacity));
+        segment =
+                MemorySegmentFactory.wrapHeapSegment(
+                        Arrays.copyOf(segment.getArray(), newCapacity));
         afterGrow();
     }
 
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
index 86d05f2..a0d1fe6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryArrayWriter.java
@@ -42,7 +42,7 @@ public final class BinaryArrayWriter extends AbstractBinaryWriter {
         this.cursor = fixedSize;
         this.numElements = numElements;
 
-        this.segment = MemorySegmentFactory.wrap(new byte[fixedSize]);
+        this.segment = MemorySegmentFactory.wrapHeapSegment(new byte[fixedSize]);
         this.segment.putInt(0, numElements);
         this.array = array;
     }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java
index ef1d22d..5eda884 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/writer/BinaryRowWriter.java
@@ -40,7 +40,7 @@ public final class BinaryRowWriter extends AbstractBinaryWriter {
         this.fixedSize = row.getFixedLengthPartSize();
         this.cursor = fixedSize;
 
-        this.segment = MemorySegmentFactory.wrap(new byte[fixedSize + initialSize]);
+        this.segment = MemorySegmentFactory.wrapHeapSegment(new byte[fixedSize + initialSize]);
         this.row = row;
         this.row.pointTo(segment, 0, segment.size());
     }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatDeserializationSchema.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatDeserializationSchema.java
index e055db2..66eaa2e 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatDeserializationSchema.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatDeserializationSchema.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RawValueData;
 import org.apache.flink.table.data.RowData;
@@ -296,7 +297,7 @@ public class RawFormatDeserializationSchema implements DeserializationSchema<Row
 
         @Override
         public Object convert(byte[] data) {
-            HeapMemorySegment segment = HeapMemorySegment.FACTORY.wrap(data);
+            HeapMemorySegment segment = MemorySegmentFactory.wrapHeapSegment(data);
             return innerConverter.convert(segment);
         }
     }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatSerializationSchema.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatSerializationSchema.java
index 894e8c1..016c918 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatSerializationSchema.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatSerializationSchema.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RawType;
@@ -216,7 +217,7 @@ public class RawFormatSerializationSchema implements SerializationSchema<RowData
 
         @Override
         public byte[] convert(RowData row) {
-            HeapMemorySegment segment = HeapMemorySegment.FACTORY.wrap(new byte[2]);
+            HeapMemorySegment segment = MemorySegmentFactory.wrapHeapSegment(new byte[2]);
             if (isBigEndian) {
                 segment.putShortBigEndian(0, row.getShort(0));
             } else {
@@ -238,7 +239,7 @@ public class RawFormatSerializationSchema implements SerializationSchema<RowData
 
         @Override
         public byte[] convert(RowData row) {
-            HeapMemorySegment segment = HeapMemorySegment.FACTORY.wrap(new byte[4]);
+            HeapMemorySegment segment = MemorySegmentFactory.wrapHeapSegment(new byte[4]);
             if (isBigEndian) {
                 segment.putIntBigEndian(0, row.getInt(0));
             } else {
@@ -259,7 +260,7 @@ public class RawFormatSerializationSchema implements SerializationSchema<RowData
 
         @Override
         public byte[] convert(RowData row) {
-            HeapMemorySegment segment = HeapMemorySegment.FACTORY.wrap(new byte[8]);
+            HeapMemorySegment segment = MemorySegmentFactory.wrapHeapSegment(new byte[8]);
             if (isBigEndian) {
                 segment.putLongBigEndian(0, row.getLong(0));
             } else {
@@ -281,7 +282,7 @@ public class RawFormatSerializationSchema implements SerializationSchema<RowData
 
         @Override
         public byte[] convert(RowData row) {
-            HeapMemorySegment segment = HeapMemorySegment.FACTORY.wrap(new byte[4]);
+            HeapMemorySegment segment = MemorySegmentFactory.wrapHeapSegment(new byte[4]);
             if (isBigEndian) {
                 segment.putFloatBigEndian(0, row.getFloat(0));
             } else {
@@ -303,7 +304,7 @@ public class RawFormatSerializationSchema implements SerializationSchema<RowData
 
         @Override
         public byte[] convert(RowData row) {
-            HeapMemorySegment segment = HeapMemorySegment.FACTORY.wrap(new byte[8]);
+            HeapMemorySegment segment = MemorySegmentFactory.wrapHeapSegment(new byte[8]);
             if (isBigEndian) {
                 segment.putDoubleBigEndian(0, row.getDouble(0));
             } else {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
index 5c5a3dd..9cbccef 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BaseHybridHashTable.java
@@ -43,7 +43,7 @@ import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment;
+import static org.apache.flink.core.memory.MemorySegmentFactory.allocateHeapSegment;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** Base table for {@link LongHybridHashTable} and {@link BinaryHashTable}. */
@@ -495,8 +495,7 @@ public abstract class BaseHybridHashTable implements MemorySegmentPool {
                         segmentSize);
         return new HeaderlessChannelReaderInputView(
                 inReader,
-                Arrays.asList(
-                        allocateUnpooledSegment(segmentSize), allocateUnpooledSegment(segmentSize)),
+                Arrays.asList(allocateHeapSegment(segmentSize), allocateHeapSegment(segmentSize)),
                 blockCount,
                 lastSegmentLimit,
                 false);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
index d5166de..c1e0b82 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/BinaryHashBucketArea.java
@@ -384,8 +384,7 @@ public class BinaryHashBucketArea {
                         // memory from heap.
                         // NOTE: must be careful, the steal memory should not return to table.
                         overflowSeg =
-                                MemorySegmentFactory.allocateUnpooledSegment(
-                                        table.segmentSize, this);
+                                MemorySegmentFactory.allocateHeapSegment(table.segmentSize, this);
                     } else {
                         final int spilledPart = table.spillPartition();
                         if (spilledPart == partition.partitionNumber) {
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
index f022e6f..3e47257 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/hashtable/LongHashPartition.java
@@ -896,7 +896,7 @@ public class LongHashPartition extends AbstractPagedInputView implements Seekabl
         inView.skipBytesToRead(8);
 
         if (segment == null || segment.size() < length) {
-            segment = MemorySegmentFactory.wrap(new byte[length]);
+            segment = MemorySegmentFactory.wrapHeapSegment(new byte[length]);
         }
         inView.readFully(segment.getHeapMemory(), 0, length);
         reuse.pointTo(segment, 0, length);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java
index 9e10e67..3292970 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelReader.java
@@ -79,7 +79,8 @@ public class CompressedBlockChannelReader
         BlockCompressor compressor = codecFactory.getCompressor();
         for (int i = 0; i < 2; i++) {
             MemorySegment segment =
-                    MemorySegmentFactory.wrap(new byte[compressor.getMaxCompressedSize(blockSize)]);
+                    MemorySegmentFactory.wrapHeapSegment(
+                            new byte[compressor.getMaxCompressedSize(blockSize)]);
             reader.readInto(new NetworkBuffer(segment, this));
         }
     }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java
index d64ae67..8464592 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedBlockChannelWriter.java
@@ -72,7 +72,7 @@ public class CompressedBlockChannelWriter
 
         for (int i = 0; i < 2; i++) {
             compressedBuffers.add(
-                    MemorySegmentFactory.wrap(
+                    MemorySegmentFactory.wrapHeapSegment(
                             new byte[compressor.getMaxCompressedSize(blockSize)]));
         }
     }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java
index 6666e9c..f584fa5 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelReaderInputView.java
@@ -70,14 +70,14 @@ public class CompressedHeaderlessChannelReaderInputView extends AbstractChannelR
         super(0);
         this.numBlocksRemaining = numBlocks;
         this.reader = ioManager.createBufferFileReader(id, this);
-        uncompressedBuffer = MemorySegmentFactory.wrap(new byte[compressionBlockSize]);
+        uncompressedBuffer = MemorySegmentFactory.wrapHeapSegment(new byte[compressionBlockSize]);
         decompressor = compressionCodecFactory.getDecompressor();
         cause = new AtomicReference<>();
 
         BlockCompressor compressor = compressionCodecFactory.getCompressor();
         for (int i = 0; i < 2; i++) {
             MemorySegment segment =
-                    MemorySegmentFactory.wrap(
+                    MemorySegmentFactory.wrapHeapSegment(
                             new byte[compressor.getMaxCompressedSize(compressionBlockSize)]);
             reader.readInto(new NetworkBuffer(segment, this));
         }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelWriterOutputView.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelWriterOutputView.java
index 61d4766..ba72e5a 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelWriterOutputView.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelWriterOutputView.java
@@ -58,11 +58,11 @@ public final class CompressedHeaderlessChannelWriterOutputView
         super(compressionBlockSize, 0);
 
         this.compressionBlockSize = compressionBlockSize;
-        buffer = MemorySegmentFactory.wrap(new byte[compressionBlockSize]);
+        buffer = MemorySegmentFactory.wrapHeapSegment(new byte[compressionBlockSize]);
         compressor = compressionCodecFactory.getCompressor();
         for (int i = 0; i < 2; i++) {
             compressedBuffers.add(
-                    MemorySegmentFactory.wrap(
+                    MemorySegmentFactory.wrapHeapSegment(
                             new byte[compressor.getMaxCompressedSize(compressionBlockSize)]));
         }
         this.writer = writer;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/BytesHashMapSpillMemorySegmentPool.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/BytesHashMapSpillMemorySegmentPool.java
index a79d871..3432f9b 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/BytesHashMapSpillMemorySegmentPool.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/aggregate/BytesHashMapSpillMemorySegmentPool.java
@@ -46,7 +46,7 @@ public class BytesHashMapSpillMemorySegmentPool implements MemorySegmentPool {
         if (allocated <= segments.size()) {
             return segments.get(allocated - 1);
         } else {
-            return MemorySegmentFactory.wrap(new byte[pageSize()]);
+            return MemorySegmentFactory.wrapHeapSegment(new byte[pageSize()]);
         }
     }
 
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
index 0f04ccb..1f077d8 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/ArrayDataSerializer.java
@@ -231,7 +231,7 @@ public class ArrayDataSerializer extends TypeSerializer<ArrayData> {
         int length = source.readInt();
         byte[] bytes = new byte[length];
         source.readFully(bytes);
-        reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
+        reuse.pointTo(MemorySegmentFactory.wrapHeapSegment(bytes), 0, bytes.length);
         return reuse;
     }
 
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
index 832b361..b2968da 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/BinaryRowDataSerializer.java
@@ -98,7 +98,7 @@ public class BinaryRowDataSerializer extends AbstractRowDataSerializer<BinaryRow
         int length = source.readInt();
         byte[] bytes = new byte[length];
         source.readFully(bytes);
-        row.pointTo(MemorySegmentFactory.wrap(bytes), 0, length);
+        row.pointTo(MemorySegmentFactory.wrapHeapSegment(bytes), 0, length);
         return row;
     }
 
@@ -111,7 +111,7 @@ public class BinaryRowDataSerializer extends AbstractRowDataSerializer<BinaryRow
 
         int length = source.readInt();
         if (segments == null || segments[0].size() < length) {
-            segments = new MemorySegment[] {MemorySegmentFactory.wrap(new byte[length])};
+            segments = new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(new byte[length])};
         }
         source.readFully(segments[0].getArray(), 0, length);
         reuse.pointTo(segments, 0, length);
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
index 950cd33..980a433 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/MapDataSerializer.java
@@ -200,7 +200,7 @@ public class MapDataSerializer extends TypeSerializer<MapData> {
         int length = source.readInt();
         byte[] bytes = new byte[length];
         source.readFully(bytes);
-        reuse.pointTo(MemorySegmentFactory.wrap(bytes), 0, bytes.length);
+        reuse.pointTo(MemorySegmentFactory.wrapHeapSegment(bytes), 0, bytes.length);
         return reuse;
     }
 
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
index 3ee21df..1b7bfa7 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/typeutils/RawValueDataSerializer.java
@@ -64,7 +64,7 @@ public final class RawValueDataSerializer<T> extends TypeSerializer<RawValueData
         T newJavaObject =
                 rawValue.getJavaObject() == null ? null : serializer.copy(rawValue.getJavaObject());
         return new BinaryRawValueData<>(
-                new MemorySegment[] {MemorySegmentFactory.wrap(bytes)},
+                new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)},
                 0,
                 bytes.length,
                 newJavaObject);
@@ -95,7 +95,7 @@ public final class RawValueDataSerializer<T> extends TypeSerializer<RawValueData
         byte[] bytes = new byte[length];
         source.readFully(bytes);
         return new BinaryRawValueData<>(
-                new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 0, bytes.length);
+                new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)}, 0, bytes.length);
     }
 
     @Override
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
index 9d8ee6d..45e8ba6 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/FileChannelUtil.java
@@ -40,7 +40,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import static org.apache.flink.core.memory.MemorySegmentFactory.allocateUnpooledSegment;
+import static org.apache.flink.core.memory.MemorySegmentFactory.allocateHeapSegment;
 
 /** File channel util for runtime. */
 public class FileChannelUtil {
@@ -71,8 +71,7 @@ public class FileChannelUtil {
             return new HeaderlessChannelReaderInputView(
                     reader,
                     Arrays.asList(
-                            allocateUnpooledSegment(segmentSize),
-                            allocateUnpooledSegment(segmentSize)),
+                            allocateHeapSegment(segmentSize), allocateHeapSegment(segmentSize)),
                     channel.getBlockCount(),
                     channel.getNumBytesInLastBlock(),
                     false);
@@ -97,8 +96,7 @@ public class FileChannelUtil {
             return new HeaderlessChannelWriterOutputView(
                     blockWriter,
                     Arrays.asList(
-                            allocateUnpooledSegment(segmentSize),
-                            allocateUnpooledSegment(segmentSize)),
+                            allocateHeapSegment(segmentSize), allocateHeapSegment(segmentSize)),
                     segmentSize);
         }
     }
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java
index 9a2746f..92bbf85 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/ResettableExternalBuffer.java
@@ -529,7 +529,7 @@ public class ResettableExternalBuffer implements ResettableRowBuffer {
             if (freeMemory == null) {
                 freeMemory = new ArrayList<>();
                 for (int i = 0; i < READ_BUFFER; i++) {
-                    freeMemory.add(MemorySegmentFactory.allocateUnpooledSegment(segmentSize));
+                    freeMemory.add(MemorySegmentFactory.allocateHeapSegment(segmentSize));
                 }
             }
             return freeMemory;
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
index c0e0938..6a89f9d 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/collections/binary/AbstractBytesHashMap.java
@@ -102,7 +102,7 @@ public abstract class AbstractBytesHashMap<K> extends BytesMap<K, BinaryRowData>
         if (valueTypes.length == 0) {
             this.hashSetMode = true;
             this.reusedValue = new BinaryRowData(0);
-            this.reusedValue.pointTo(MemorySegmentFactory.wrap(new byte[8]), 0, 8);
+            this.reusedValue.pointTo(MemorySegmentFactory.wrapHeapSegment(new byte[8]), 0, 8);
             LOG.info("BytesHashMap with hashSetMode = true.");
         } else {
             this.hashSetMode = false;
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
index e7d5773..db752cc 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryArrayDataTest.java
@@ -403,8 +403,8 @@ public class BinaryArrayDataTest {
     private static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) {
         int newSize = (bytes.length + 1) / 2 + baseOffset;
         MemorySegment[] ret = new MemorySegment[2];
-        ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
-        ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);
+        ret[0] = MemorySegmentFactory.wrapHeapSegment(new byte[newSize]);
+        ret[1] = MemorySegmentFactory.wrapHeapSegment(new byte[newSize]);
 
         ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
         ret[1].put(0, bytes, newSize - baseOffset, bytes.length - (newSize - baseOffset));
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
index c0c91a4..7dc7f5c 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryRowDataTest.java
@@ -100,7 +100,7 @@ public class BinaryRowDataTest {
         assertEquals(536, new BinaryRowData(65).getFixedLengthPartSize());
         assertEquals(1048, new BinaryRowData(128).getFixedLengthPartSize());
 
-        MemorySegment segment = MemorySegmentFactory.wrap(new byte[100]);
+        MemorySegment segment = MemorySegmentFactory.wrapHeapSegment(new byte[100]);
         BinaryRowData row = new BinaryRowData(2);
         row.pointTo(segment, 10, 48);
         assertSame(row.getSegments()[0], segment);
@@ -110,7 +110,7 @@ public class BinaryRowDataTest {
 
     @Test
     public void testSetAndGet() {
-        MemorySegment segment = MemorySegmentFactory.wrap(new byte[80]);
+        MemorySegment segment = MemorySegmentFactory.wrapHeapSegment(new byte[80]);
         BinaryRowData row = new BinaryRowData(9);
         row.pointTo(segment, 0, 80);
         row.setNullAt(0);
@@ -160,8 +160,8 @@ public class BinaryRowDataTest {
 
         // test copy from var segments.
         int subSize = row.getFixedLengthPartSize() + 10;
-        MemorySegment subMs1 = MemorySegmentFactory.wrap(new byte[subSize]);
-        MemorySegment subMs2 = MemorySegmentFactory.wrap(new byte[subSize]);
+        MemorySegment subMs1 = MemorySegmentFactory.wrapHeapSegment(new byte[subSize]);
+        MemorySegment subMs2 = MemorySegmentFactory.wrapHeapSegment(new byte[subSize]);
         row.getSegments()[0].copyTo(0, subMs1, 0, subSize);
         row.getSegments()[0].copyTo(subSize, subMs2, 0, row.getSizeInBytes() - subSize);
 
@@ -208,7 +208,7 @@ public class BinaryRowDataTest {
         MemorySegment[] memorySegments = new MemorySegment[5];
         ArrayList<MemorySegment> memorySegmentList = new ArrayList<>();
         for (int i = 0; i < 5; i++) {
-            memorySegments[i] = MemorySegmentFactory.wrap(new byte[64]);
+            memorySegments[i] = MemorySegmentFactory.wrapHeapSegment(new byte[64]);
             memorySegmentList.add(memorySegments[i]);
         }
 
@@ -675,7 +675,7 @@ public class BinaryRowDataTest {
         MemorySegment[] memorySegments = new MemorySegment[3];
         ArrayList<MemorySegment> memorySegmentList = new ArrayList<>();
         for (int i = 0; i < 3; i++) {
-            memorySegments[i] = MemorySegmentFactory.wrap(new byte[64]);
+            memorySegments[i] = MemorySegmentFactory.wrapHeapSegment(new byte[64]);
             memorySegmentList.add(memorySegments[i]);
         }
         RandomAccessOutputView out = new RandomAccessOutputView(memorySegments, 64);
@@ -766,7 +766,7 @@ public class BinaryRowDataTest {
         MemorySegment[] memorySegments = new MemorySegment[segTotalNumber];
         Map<MemorySegment, Integer> msIndex = new HashMap<>();
         for (int i = 0; i < segTotalNumber; i++) {
-            memorySegments[i] = MemorySegmentFactory.wrap(new byte[segSize]);
+            memorySegments[i] = MemorySegmentFactory.wrapHeapSegment(new byte[segSize]);
             msIndex.put(memorySegments[i], i);
         }
 
@@ -896,7 +896,7 @@ public class BinaryRowDataTest {
     public void testHashAndCopy() throws IOException {
         MemorySegment[] segments = new MemorySegment[3];
         for (int i = 0; i < 3; i++) {
-            segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+            segments[i] = MemorySegmentFactory.wrapHeapSegment(new byte[64]);
         }
         RandomAccessOutputView out = new RandomAccessOutputView(segments, 64);
         BinaryRowDataSerializer serializer = new BinaryRowDataSerializer(2);
@@ -943,7 +943,8 @@ public class BinaryRowDataTest {
         BinaryStringData string = BinaryStringData.fromString("hahahahaha");
         RandomAccessOutputView out =
                 new RandomAccessOutputView(
-                        new MemorySegment[] {MemorySegmentFactory.wrap(new byte[1024])}, 64);
+                        new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(new byte[1024])},
+                        64);
         serializer.serialize(string, out);
 
         RandomAccessInputView input =
@@ -973,7 +974,7 @@ public class BinaryRowDataTest {
         {
             MemorySegment[] segments = new MemorySegment[4];
             for (int i = 0; i < segments.length; i++) {
-                segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+                segments[i] = MemorySegmentFactory.wrapHeapSegment(new byte[64]);
             }
             RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size());
             serializer.serializeToPages(row24, out);
@@ -1005,7 +1006,7 @@ public class BinaryRowDataTest {
         {
             MemorySegment[] segments = new MemorySegment[7];
             for (int i = 0; i < segments.length; i++) {
-                segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+                segments[i] = MemorySegmentFactory.wrapHeapSegment(new byte[64]);
             }
             RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size());
             serializer.serializeToPages(row24, out);
@@ -1037,7 +1038,7 @@ public class BinaryRowDataTest {
         {
             MemorySegment[] segments = new MemorySegment[3];
             for (int i = 0; i < segments.length; i++) {
-                segments[i] = MemorySegmentFactory.wrap(new byte[64]);
+                segments[i] = MemorySegmentFactory.wrapHeapSegment(new byte[64]);
             }
             RandomAccessOutputView out = new RandomAccessOutputView(segments, segments[0].size());
             serializer.serializeToPages(row24, out);
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
index 4a8741f..a03f1a0 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
@@ -125,7 +125,8 @@ public class BinaryStringDataTest {
             string.getSegments()[0].get(segSize - pad, bytes2, 0, numBytes - segSize + pad);
             return BinaryStringData.fromAddress(
                     new MemorySegment[] {
-                        MemorySegmentFactory.wrap(bytes1), MemorySegmentFactory.wrap(bytes2)
+                        MemorySegmentFactory.wrapHeapSegment(bytes1),
+                        MemorySegmentFactory.wrapHeapSegment(bytes2)
                     },
                     pad,
                     numBytes);
@@ -190,8 +191,8 @@ public class BinaryStringDataTest {
         assertTrue(fromString("你好").compareTo(fromString("世界")) > 0);
         assertTrue(fromString("你好123").compareTo(fromString("你好122")) > 0);
 
-        MemorySegment segment1 = MemorySegmentFactory.allocateUnpooledSegment(1024);
-        MemorySegment segment2 = MemorySegmentFactory.allocateUnpooledSegment(1024);
+        MemorySegment segment1 = MemorySegmentFactory.allocateHeapSegment(1024);
+        MemorySegment segment2 = MemorySegmentFactory.allocateHeapSegment(1024);
         SortUtil.putStringNormalizedKey(fromString("abcabcabc"), segment1, 0, 9);
         SortUtil.putStringNormalizedKey(fromString("abcabcabC"), segment2, 0, 9);
         assertTrue(segment1.compare(segment2, 0, 0, 9) > 0);
@@ -204,14 +205,14 @@ public class BinaryStringDataTest {
 
         // prepare
         MemorySegment[] segments1 = new MemorySegment[2];
-        segments1[0] = MemorySegmentFactory.wrap(new byte[10]);
-        segments1[1] = MemorySegmentFactory.wrap(new byte[10]);
+        segments1[0] = MemorySegmentFactory.wrapHeapSegment(new byte[10]);
+        segments1[1] = MemorySegmentFactory.wrapHeapSegment(new byte[10]);
         segments1[0].put(5, "abcde".getBytes(UTF_8), 0, 5);
         segments1[1].put(0, "aaaaa".getBytes(UTF_8), 0, 5);
 
         MemorySegment[] segments2 = new MemorySegment[2];
-        segments2[0] = MemorySegmentFactory.wrap(new byte[5]);
-        segments2[1] = MemorySegmentFactory.wrap(new byte[5]);
+        segments2[0] = MemorySegmentFactory.wrapHeapSegment(new byte[5]);
+        segments2[1] = MemorySegmentFactory.wrapHeapSegment(new byte[5]);
         segments2[0].put(0, "abcde".getBytes(UTF_8), 0, 5);
         segments2[1].put(0, "b".getBytes(UTF_8), 0, 1);
 
@@ -238,7 +239,7 @@ public class BinaryStringDataTest {
         assertEquals(1, binaryString2.compareTo(binaryString1));
 
         // test go ahead single
-        segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(new byte[10])};
+        segments2 = new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(new byte[10])};
         segments2[0].put(4, "abcdeb".getBytes(UTF_8), 0, 6);
         binaryString1 = BinaryStringData.fromAddress(segments1, 5, 10);
         binaryString2 = BinaryStringData.fromAddress(segments2, 4, 6);
@@ -578,8 +579,8 @@ public class BinaryStringDataTest {
         BinaryStringData str3;
         {
             MemorySegment[] segments = new MemorySegment[2];
-            segments[0] = MemorySegmentFactory.wrap(new byte[10]);
-            segments[1] = MemorySegmentFactory.wrap(new byte[10]);
+            segments[0] = MemorySegmentFactory.wrapHeapSegment(new byte[10]);
+            segments[1] = MemorySegmentFactory.wrapHeapSegment(new byte[10]);
             str3 = BinaryStringData.fromAddress(segments, 15, 0);
         }
 
@@ -757,13 +758,16 @@ public class BinaryStringDataTest {
         String str = new String(bytes, StandardCharsets.UTF_8);
         assertEquals(str, StringUtf8Utils.decodeUTF8(bytes, 0, bytes.length));
         assertEquals(
-                str, StringUtf8Utils.decodeUTF8(MemorySegmentFactory.wrap(bytes), 0, bytes.length));
+                str,
+                StringUtf8Utils.decodeUTF8(
+                        MemorySegmentFactory.wrapHeapSegment(bytes), 0, bytes.length));
 
         byte[] newBytes = new byte[bytes.length + 5];
         System.arraycopy(bytes, 0, newBytes, 5, bytes.length);
         assertEquals(
                 str,
-                StringUtf8Utils.decodeUTF8(MemorySegmentFactory.wrap(newBytes), 5, bytes.length));
+                StringUtf8Utils.decodeUTF8(
+                        MemorySegmentFactory.wrapHeapSegment(newBytes), 5, bytes.length));
     }
 
     @Test
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/binary/BinarySegmentUtilsTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/binary/BinarySegmentUtilsTest.java
index 111524d..7ec5e61 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/binary/BinarySegmentUtilsTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/binary/BinarySegmentUtilsTest.java
@@ -41,8 +41,8 @@ public class BinarySegmentUtilsTest {
     public void testCopy() {
         // test copy the content of the latter Seg
         MemorySegment[] segments = new MemorySegment[2];
-        segments[0] = MemorySegmentFactory.wrap(new byte[] {0, 2, 5});
-        segments[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15});
+        segments[0] = MemorySegmentFactory.wrapHeapSegment(new byte[] {0, 2, 5});
+        segments[1] = MemorySegmentFactory.wrapHeapSegment(new byte[] {6, 12, 15});
 
         byte[] bytes = BinarySegmentUtils.copyToBytes(segments, 4, 2);
         Assert.assertArrayEquals(new byte[] {12, 15}, bytes);
@@ -52,13 +52,13 @@ public class BinarySegmentUtilsTest {
     public void testEquals() {
         // test copy the content of the latter Seg
         MemorySegment[] segments1 = new MemorySegment[3];
-        segments1[0] = MemorySegmentFactory.wrap(new byte[] {0, 2, 5});
-        segments1[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15});
-        segments1[2] = MemorySegmentFactory.wrap(new byte[] {1, 1, 1});
+        segments1[0] = MemorySegmentFactory.wrapHeapSegment(new byte[] {0, 2, 5});
+        segments1[1] = MemorySegmentFactory.wrapHeapSegment(new byte[] {6, 12, 15});
+        segments1[2] = MemorySegmentFactory.wrapHeapSegment(new byte[] {1, 1, 1});
 
         MemorySegment[] segments2 = new MemorySegment[2];
-        segments2[0] = MemorySegmentFactory.wrap(new byte[] {6, 0, 2, 5});
-        segments2[1] = MemorySegmentFactory.wrap(new byte[] {6, 12, 15, 18});
+        segments2[0] = MemorySegmentFactory.wrapHeapSegment(new byte[] {6, 0, 2, 5});
+        segments2[1] = MemorySegmentFactory.wrapHeapSegment(new byte[] {6, 12, 15, 18});
 
         assertTrue(BinarySegmentUtils.equalsMultiSegments(segments1, 0, segments2, 0, 0));
         assertTrue(BinarySegmentUtils.equals(segments1, 0, segments2, 1, 3));
@@ -101,12 +101,12 @@ public class BinarySegmentUtilsTest {
 
         // test var segs
         MemorySegment[] segments1 = new MemorySegment[2];
-        segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
-        segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+        segments1[0] = MemorySegmentFactory.wrapHeapSegment(new byte[32]);
+        segments1[1] = MemorySegmentFactory.wrapHeapSegment(new byte[32]);
         MemorySegment[] segments2 = new MemorySegment[3];
-        segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
-        segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
-        segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
+        segments2[0] = MemorySegmentFactory.wrapHeapSegment(new byte[16]);
+        segments2[1] = MemorySegmentFactory.wrapHeapSegment(new byte[16]);
+        segments2[2] = MemorySegmentFactory.wrapHeapSegment(new byte[16]);
 
         segments1[0].put(9, (byte) 1);
         assertFalse(BinarySegmentUtils.equals(segments1, 0, segments2, 14, 14));
@@ -122,14 +122,15 @@ public class BinarySegmentUtilsTest {
     @Test
     public void testBoundaryCopy() {
         MemorySegment[] segments1 = new MemorySegment[2];
-        segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
-        segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+        segments1[0] = MemorySegmentFactory.wrapHeapSegment(new byte[32]);
+        segments1[1] = MemorySegmentFactory.wrapHeapSegment(new byte[32]);
         segments1[0].put(15, (byte) 5);
         segments1[1].put(15, (byte) 6);
 
         {
             byte[] bytes = new byte[64];
-            MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+            MemorySegment[] segments2 =
+                    new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)};
 
             BinarySegmentUtils.copyToBytes(segments1, 0, bytes, 0, 64);
             assertTrue(BinarySegmentUtils.equals(segments1, 0, segments2, 0, 64));
@@ -137,7 +138,8 @@ public class BinarySegmentUtilsTest {
 
         {
             byte[] bytes = new byte[64];
-            MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+            MemorySegment[] segments2 =
+                    new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)};
 
             BinarySegmentUtils.copyToBytes(segments1, 32, bytes, 0, 14);
             assertTrue(BinarySegmentUtils.equals(segments1, 32, segments2, 0, 14));
@@ -145,7 +147,8 @@ public class BinarySegmentUtilsTest {
 
         {
             byte[] bytes = new byte[64];
-            MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+            MemorySegment[] segments2 =
+                    new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)};
 
             BinarySegmentUtils.copyToBytes(segments1, 34, bytes, 0, 14);
             assertTrue(BinarySegmentUtils.equals(segments1, 34, segments2, 0, 14));
@@ -155,14 +158,15 @@ public class BinarySegmentUtilsTest {
     @Test
     public void testCopyToUnsafe() {
         MemorySegment[] segments1 = new MemorySegment[2];
-        segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
-        segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+        segments1[0] = MemorySegmentFactory.wrapHeapSegment(new byte[32]);
+        segments1[1] = MemorySegmentFactory.wrapHeapSegment(new byte[32]);
         segments1[0].put(15, (byte) 5);
         segments1[1].put(15, (byte) 6);
 
         {
             byte[] bytes = new byte[64];
-            MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+            MemorySegment[] segments2 =
+                    new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)};
 
             BinarySegmentUtils.copyToUnsafe(segments1, 0, bytes, BYTE_ARRAY_BASE_OFFSET, 64);
             assertTrue(BinarySegmentUtils.equals(segments1, 0, segments2, 0, 64));
@@ -170,7 +174,8 @@ public class BinarySegmentUtilsTest {
 
         {
             byte[] bytes = new byte[64];
-            MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+            MemorySegment[] segments2 =
+                    new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)};
 
             BinarySegmentUtils.copyToUnsafe(segments1, 32, bytes, BYTE_ARRAY_BASE_OFFSET, 14);
             assertTrue(BinarySegmentUtils.equals(segments1, 32, segments2, 0, 14));
@@ -178,7 +183,8 @@ public class BinarySegmentUtilsTest {
 
         {
             byte[] bytes = new byte[64];
-            MemorySegment[] segments2 = new MemorySegment[] {MemorySegmentFactory.wrap(bytes)};
+            MemorySegment[] segments2 =
+                    new MemorySegment[] {MemorySegmentFactory.wrapHeapSegment(bytes)};
 
             BinarySegmentUtils.copyToUnsafe(segments1, 34, bytes, BYTE_ARRAY_BASE_OFFSET, 14);
             assertTrue(BinarySegmentUtils.equals(segments1, 34, segments2, 0, 14));
@@ -188,12 +194,12 @@ public class BinarySegmentUtilsTest {
     @Test
     public void testFind() {
         MemorySegment[] segments1 = new MemorySegment[2];
-        segments1[0] = MemorySegmentFactory.wrap(new byte[32]);
-        segments1[1] = MemorySegmentFactory.wrap(new byte[32]);
+        segments1[0] = MemorySegmentFactory.wrapHeapSegment(new byte[32]);
+        segments1[1] = MemorySegmentFactory.wrapHeapSegment(new byte[32]);
         MemorySegment[] segments2 = new MemorySegment[3];
-        segments2[0] = MemorySegmentFactory.wrap(new byte[16]);
-        segments2[1] = MemorySegmentFactory.wrap(new byte[16]);
-        segments2[2] = MemorySegmentFactory.wrap(new byte[16]);
+        segments2[0] = MemorySegmentFactory.wrapHeapSegment(new byte[16]);
+        segments2[1] = MemorySegmentFactory.wrapHeapSegment(new byte[16]);
+        segments2[2] = MemorySegmentFactory.wrapHeapSegment(new byte[16]);
 
         assertEquals(34, BinarySegmentUtils.find(segments1, 34, 0, segments2, 0, 0));
         assertEquals(-1, BinarySegmentUtils.find(segments1, 34, 0, segments2, 0, 15));
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/util/DataFormatTestUtil.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/util/DataFormatTestUtil.java
index 6c97813..d178fef 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/util/DataFormatTestUtil.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/util/DataFormatTestUtil.java
@@ -86,7 +86,7 @@ public class DataFormatTestUtil {
         int posInSeg = baseOffset;
         int remainSize = 160;
         for (int i = 0; i < segments.length; i++) {
-            segments[i] = MemorySegmentFactory.wrap(new byte[32]);
+            segments[i] = MemorySegmentFactory.wrapHeapSegment(new byte[32]);
             int copy = Math.min(32 - posInSeg, remainSize);
             row160.getSegments()[0].copyTo(160 - remainSize, segments[i], posInSeg, copy);
             remainSize -= copy;
@@ -104,7 +104,8 @@ public class DataFormatTestUtil {
     public static BinaryRowData getMultiSeg160BytesInOneSegRow(BinaryRowData row160) {
         MemorySegment[] segments = new MemorySegment[2];
         segments[0] = row160.getSegments()[0];
-        segments[1] = MemorySegmentFactory.wrap(new byte[row160.getSegments()[0].size()]);
+        segments[1] =
+                MemorySegmentFactory.wrapHeapSegment(new byte[row160.getSegments()[0].size()]);
         row160.pointTo(segments, 0, row160.getSizeInBytes());
         return row160;
     }
@@ -113,8 +114,8 @@ public class DataFormatTestUtil {
     public static MemorySegment[] splitBytes(byte[] bytes, int baseOffset) {
         int newSize = (bytes.length + 1) / 2 + baseOffset;
         MemorySegment[] ret = new MemorySegment[2];
-        ret[0] = MemorySegmentFactory.wrap(new byte[newSize]);
-        ret[1] = MemorySegmentFactory.wrap(new byte[newSize]);
+        ret[0] = MemorySegmentFactory.wrapHeapSegment(new byte[newSize]);
+        ret[1] = MemorySegmentFactory.wrapHeapSegment(new byte[newSize]);
 
         ret[0].put(baseOffset, bytes, 0, newSize - baseOffset);
         ret[1].put(0, bytes, newSize - baseOffset, bytes.length - (newSize - baseOffset));
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/SortUtilTest.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/SortUtilTest.java
index d0e267a..4fc292b 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/SortUtilTest.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/SortUtilTest.java
@@ -41,8 +41,8 @@ public class SortUtilTest {
         MemorySegment[] segments = new MemorySegment[len];
         MemorySegment[] compareSegs = new MemorySegment[len];
         for (int i = 0; i < len; i++) {
-            segments[i] = MemorySegmentFactory.allocateUnpooledSegment(20);
-            compareSegs[i] = MemorySegmentFactory.allocateUnpooledSegment(20);
+            segments[i] = MemorySegmentFactory.allocateHeapSegment(20);
+            compareSegs[i] = MemorySegmentFactory.allocateHeapSegment(20);
         }
 
         {
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/TestMemorySegmentPool.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/TestMemorySegmentPool.java
index 2d1daed..3f56f5f 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/TestMemorySegmentPool.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/sort/TestMemorySegmentPool.java
@@ -48,6 +48,6 @@ public class TestMemorySegmentPool implements MemorySegmentPool {
 
     @Override
     public MemorySegment nextSegment() {
-        return MemorySegmentFactory.wrap(new byte[pageSize]);
+        return MemorySegmentFactory.wrapHeapSegment(new byte[pageSize]);
     }
 }
diff --git a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
index 44a3233..a8e4d4f 100644
--- a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
+++ b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/util/collections/binary/BytesHashMapTestBase.java
@@ -79,7 +79,8 @@ public abstract class BytesHashMapTestBase<K> extends BytesMapTestBase {
         this.valueSerializer = new BinaryRowDataSerializer(VALUE_TYPES.length);
         this.defaultValue = valueSerializer.createInstance();
         int valueSize = defaultValue.getFixedLengthPartSize();
-        this.defaultValue.pointTo(MemorySegmentFactory.wrap(new byte[valueSize]), 0, valueSize);
+        this.defaultValue.pointTo(
+                MemorySegmentFactory.wrapHeapSegment(new byte[valueSize]), 0, valueSize);
     }
 
     /**
@@ -363,7 +364,7 @@ public abstract class BytesHashMapTestBase<K> extends BytesMapTestBase {
     private void verifyKeyPresent(K[] keys, AbstractBytesHashMap<K> table) {
         Assert.assertEquals(NUM_ENTRIES, table.getNumElements());
         BinaryRowData present = new BinaryRowData(0);
-        present.pointTo(MemorySegmentFactory.wrap(new byte[8]), 0, 8);
+        present.pointTo(MemorySegmentFactory.wrapHeapSegment(new byte[8]), 0, 8);
         for (int i = 0; i < NUM_ENTRIES; i++) {
             K groupKey = keys[i];
             // look up and retrieve
@@ -376,7 +377,7 @@ public abstract class BytesHashMapTestBase<K> extends BytesMapTestBase {
 
     private void verifyKeyInsert(K[] keys, AbstractBytesHashMap<K> table) throws IOException {
         BinaryRowData present = new BinaryRowData(0);
-        present.pointTo(MemorySegmentFactory.wrap(new byte[8]), 0, 8);
+        present.pointTo(MemorySegmentFactory.wrapHeapSegment(new byte[8]), 0, 8);
         for (int i = 0; i < NUM_ENTRIES; i++) {
             K groupKey = keys[i];
             // look up and insert
diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
index b02ca93..8baab98 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java
@@ -194,7 +194,7 @@ public class HashTableRecordWidthCombinations {
     private static List<MemorySegment> getMemory(int numSegments, int segmentSize) {
         ArrayList<MemorySegment> list = new ArrayList<MemorySegment>(numSegments);
         for (int i = 0; i < numSegments; i++) {
-            list.add(MemorySegmentFactory.allocateUnpooledSegment(segmentSize));
+            list.add(MemorySegmentFactory.allocateHeapSegment(segmentSize));
         }
         return list;
     }
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala
index 3a158d6..c9481a4 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/CaseClassNormalizedKeySortingTest.scala
@@ -75,7 +75,7 @@ class CaseClassNormalizedKeySortingTest {
     val numMemSegs = 20
     val memory : util.List[MemorySegment] = new util.ArrayList[MemorySegment](numMemSegs)
     for (i <- 1 to numMemSegs) {
-      memory.add(MemorySegmentFactory.allocateUnpooledSegment(32*1024))
+      memory.add(MemorySegmentFactory.allocateHeapSegment(32*1024))
     }
 
     val sorter : NormalizedKeySorter[CaseTestClass] = new NormalizedKeySorter[CaseTestClass](