You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/09/18 06:23:35 UTC

[GitHub] [flink] KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools

KarmaGYZ commented on a change in pull request #9693: [FLINK-13984] Separate on-heap and off-heap managed memory pools
URL: https://github.com/apache/flink/pull/9693#discussion_r325475374
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 ##########
 @@ -474,60 +403,121 @@ public void releaseAll(Object owner) {
 			return;
 		}
 
-		// -------------------- BEGIN CRITICAL SECTION -------------------
-		synchronized (lock) {
-			if (isShutDown) {
-				throw new IllegalStateException("Memory manager has been shut down.");
-			}
-
-			// get all segments
-			final Set<MemorySegment> segments = allocatedSegments.remove(owner);
+		Preconditions.checkState(!isShutDown, "Memory manager has been shut down.");
 
-			// all segments may have been freed previously individually
-			if (segments == null || segments.isEmpty()) {
-				return;
-			}
+		// get all segments
+		Set<MemorySegment> segments = allocatedSegments.remove(owner);
 
-			// free each segment
-			if (isPreAllocated) {
-				for (MemorySegment seg : segments) {
-					memoryPool.returnSegmentToPool(seg);
-				}
-			}
-			else {
-				for (MemorySegment seg : segments) {
-					seg.free();
-				}
-				numNonAllocatedPages += segments.size();
-			}
+		// all segments may have been freed previously individually
+		if (segments == null || segments.isEmpty()) {
+			return;
+		}
 
-			segments.clear();
+		// free each segment
+		EnumMap<MemoryType, Long> releasedMemory = new EnumMap<>(MemoryType.class);
+		for (MemorySegment segment : segments) {
+			releaseSegment(segment, releasedMemory);
 		}
-		// -------------------- END CRITICAL SECTION -------------------
+		budgetByType.releaseBudgetForKeys(releasedMemory);
+
+		segments.clear();
 	}
 
-	// ------------------------------------------------------------------------
-	//  Properties, sizes and size conversions
-	// ------------------------------------------------------------------------
+	/**
+	 * Reserves memory of a certain type for an owner from this memory manager.
+	 *
+	 * @param owner The owner to associate with the memory reservation, for the fallback release.
+	 * @param memoryType type of memory to reserve (heap / off-heap).
+	 * @param size size of memory to reserve.
+	 * @throws MemoryAllocationException Thrown, if this memory manager does not have the requested amount
+	 *                                   of memory any more.
+	 */
+	public void reserveMemory(Object owner, MemoryType memoryType, long size) throws MemoryAllocationException {
+		checkMemoryReservationPreconditions(owner, memoryType, size);
+		if (size == 0L) {
+			return;
+		}
+
+		long acquiredMemory = budgetByType.acquireBudgetForKey(memoryType, size);
+		if (acquiredMemory < size) {
+			throw new MemoryAllocationException(
+				String.format("Could not allocate %d bytes. Only %d bytes are remaining.", size, acquiredMemory));
+		}
+
+		reservedMemory.compute(owner, (o, reservations) -> {
+			Map<MemoryType, Long> newReservations = reservations;
+			if (reservations == null) {
+				newReservations = new EnumMap<>(MemoryType.class);
+				newReservations.put(memoryType, size);
+			} else {
+				reservations.compute(
+					memoryType,
+					(mt, currentlyReserved) -> currentlyReserved == null ? size : currentlyReserved + size);
+			}
+			return newReservations;
+		});
+
+		Preconditions.checkState(!isShutDown, "Memory manager has been concurrently shut down.");
+	}
 
 	/**
-	 * Gets the type of memory (heap / off-heap) managed by this memory manager.
+	 * Releases memory of a certain type from an owner to this memory manager.
 	 *
-	 * @return The type of memory managed by this memory manager.
+	 * @param owner The owner to associate with the memory reservation, for the fallback release.
+	 * @param memoryType type of memory to release (heap / off-heap).
+	 * @param size size of memory to release.
 	 */
-	public MemoryType getMemoryType() {
-		return memoryType;
+	public void releaseMemory(Object owner, MemoryType memoryType, long size) {
+		checkMemoryReservationPreconditions(owner, memoryType, size);
+		if (size == 0L) {
+			return;
+		}
+
+		reservedMemory.compute(owner, (o, reservations) -> {
+			if (reservations != null) {
+				reservations.compute(
+					memoryType,
+					(mt, currentlyReserved) ->
+						currentlyReserved == null || currentlyReserved <= size ? null : currentlyReserved - size);
 
 Review comment:
   For currentlyReserved == null || currentlyReserved <= size, we may add log to hint that owner does not have such memory reserved.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services