You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/01 10:17:00 UTC

[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

    [ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633823#comment-16633823 ] 

ASF GitHub Bot commented on FLINK-10339:
----------------------------------------

NicoK closed pull request #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index c235999db91..48b9a20e9da 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -74,6 +74,19 @@ public static MemorySegment allocateUnpooledSegment(int size, Object owner) {
 		return new HybridMemorySegment(new byte[size], owner);
 	}
 
+	/**
+	 * Allocates some unpooled off-heap memory and creates a new memory segment that
+	 * represents that memory.
+	 *
+	 * @param size The size of the off-heap memory segment to allocate.
+	 * @param owner The owner to associate with the off-heap memory segment.
+	 * @return A new memory segment, backed by unpooled off-heap memory.
+	 */
+	public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) {
+		ByteBuffer memory = ByteBuffer.allocateDirect(size);
+		return wrapPooledOffHeapMemory(memory, owner);
+	}
+
 	/**
 	 * Creates a memory segment that wraps the given byte array.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index a369ce5a5fb..1fddb612781 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -30,7 +30,6 @@
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -89,8 +88,7 @@ public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) {
 
 		try {
 			for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
-				ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
-				availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null));
+				availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null));
 			}
 		}
 		catch (OutOfMemoryError err) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index 2a6a71f05d6..f941e20846e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -257,7 +257,8 @@ public String toString() {
 
 			synchronized (buffers) {
 				for (int i = 0; i < numberOfBuffers; i++) {
-					buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this));
+					buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledOffHeapMemory(
+						memorySegmentSize, null), this));
 				}
 			}
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index c450880f98b..d1a304a4909 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -675,8 +675,7 @@ void clear() {
 
 		@Override
 		MemorySegment allocateNewSegment(Object owner) {
-			ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize);
-			return MemorySegmentFactory.wrapPooledOffHeapMemory(memory, owner);
+			return MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner);
 		}
 
 		@Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> SpillReadBufferPool cannot use off-heap memory
> ----------------------------------------------
>
>                 Key: FLINK-10339
>                 URL: https://issues.apache.org/jira/browse/FLINK-10339
>             Project: Flink
>          Issue Type: Improvement
>          Components: Network
>    Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>            Reporter: zhijiang
>            Assignee: zhijiang
>            Priority: Minor
>              Labels: pull-request-available
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses heap memory for caching. We can make it as off-heap by default similar with {{NetworkBufferPool}} or decide the type by the current parameter {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)