You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/10 13:42:49 UTC

[7/7] flink git commit: [FLINK-4545] [network] Small adjustments to LocalBufferPool with limited the number of used buffers

[FLINK-4545] [network] Small adjustments to LocalBufferPool with limited the number of used buffers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/206ea211
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/206ea211
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/206ea211

Branch: refs/heads/master
Commit: 206ea2119ce80c7d4920a471eb58c1d30abbd995
Parents: 11e2aa6
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 10 12:25:17 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 13:30:58 2017 +0100

----------------------------------------------------------------------
 .../io/network/buffer/LocalBufferPool.java      |  7 ++---
 .../io/network/buffer/NetworkBufferPool.java    | 27 +++++++++++++-------
 .../runtime/taskmanager/TaskManagerTest.java    |  2 +-
 3 files changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a587997..b485fd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -58,7 +58,7 @@ class LocalBufferPool implements BufferPool {
 	 * The currently available memory segments. These are segments, which have been requested from
 	 * the network buffer pool and are currently not handed out as Buffer instances.
 	 */
-	private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
+	private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
 
 	/**
 	 * Buffer availability listeners, which need to be notified when a Buffer becomes available.
@@ -297,8 +297,9 @@ class LocalBufferPool implements BufferPool {
 	@Override
 	public void setNumBuffers(int numBuffers) throws IOException {
 		synchronized (availableMemorySegments) {
-			checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " +
-				numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
+			checkArgument(numBuffers >= numberOfRequiredMemorySegments,
+					"Buffer pool needs at least %s buffers, but tried to set to %s",
+					numberOfRequiredMemorySegments, numBuffers);
 
 			if (numBuffers > maxNumberOfMemorySegments) {
 				currentPoolSize = maxNumberOfMemorySegments;

http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 7668759..5f2da03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -22,13 +22,13 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
-import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 
@@ -50,7 +50,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 	private final int memorySegmentSize;
 
-	private final Queue<MemorySegment> availableMemorySegments;
+	private final ArrayBlockingQueue<MemorySegment> availableMemorySegments;
 
 	private volatile boolean isDestroyed;
 
@@ -124,9 +124,10 @@ public class NetworkBufferPool implements BufferPoolFactory {
 		return availableMemorySegments.poll();
 	}
 
-	// This is not safe with regard to destroy calls, but it does not hurt, because destroy happens
-	// only once at clean up time (task manager shutdown).
 	public void recycle(MemorySegment segment) {
+		// Adds the segment back to the queue, which does not immediately free the memory
+		// however, since this happens when references to the global pool are also released,
+		// making the availableMemorySegments queue and its contained object reclaimable
 		availableMemorySegments.add(segment);
 	}
 
@@ -260,7 +261,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
 		assert Thread.holdsLock(factoryLock);
 
 		// All buffers, which are not among the required ones
-		int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
+		final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
 
 		if (numAvailableMemorySegment == 0) {
 			// in this case, we need to redistribute buffers so that every pool gets its minimum
@@ -278,7 +279,8 @@ public class NetworkBufferPool implements BufferPoolFactory {
 		 * a ratio that we use to distribute the buffers.
 		 */
 
-		int totalCapacity = 0;
+		long totalCapacity = 0; // long to avoid int overflow
+
 		for (LocalBufferPool bufferPool : allBufferPools) {
 			int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
 				bufferPool.getNumberOfRequiredMemorySegments();
@@ -290,9 +292,13 @@ public class NetworkBufferPool implements BufferPoolFactory {
 			return; // necessary to avoid div by zero when nothing to re-distribute
 		}
 
-		int memorySegmentsToDistribute = Math.min(numAvailableMemorySegment, totalCapacity);
+		// since one of the arguments of 'min(a,b)' is a positive int, this is actually
+		// guaranteed to be within the 'int' domain
+		// (we use a checked downCast to handle possible bugs more gracefully).
+		final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
+				Math.min(numAvailableMemorySegment, totalCapacity));
 
-		int totalPartsUsed = 0; // of totalCapacity
+		long totalPartsUsed = 0; // of totalCapacity
 		int numDistributedMemorySegment = 0;
 		for (LocalBufferPool bufferPool : allBufferPools) {
 			int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
@@ -307,7 +313,10 @@ public class NetworkBufferPool implements BufferPoolFactory {
 
 			// avoid remaining buffers by looking at the total capacity that should have been
 			// re-distributed up until here
-			int mySize = memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment;
+			// the downcast will always succeed, because both arguments of the subtraction are in the 'int' domain
+			final int mySize = MathUtils.checkedDownCast(
+					memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);
+
 			numDistributedMemorySegment += mySize;
 			bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 0b39e20..730595c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -991,7 +991,7 @@ public class TaskManagerTest extends TestLogger {
 		config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
 
 		TaskManagerServicesConfiguration tmConfig =
-			TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getByName("localhost"), true);
+			TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLoopbackAddress(), true);
 
 		assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
 		assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);