You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/03/10 13:42:49 UTC
[7/7] flink git commit: [FLINK-4545] [network] Small adjustments to
LocalBufferPool with limited the number of used buffers
[FLINK-4545] [network] Small adjustments to LocalBufferPool with limited the number of used buffers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/206ea211
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/206ea211
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/206ea211
Branch: refs/heads/master
Commit: 206ea2119ce80c7d4920a471eb58c1d30abbd995
Parents: 11e2aa6
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 10 12:25:17 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 10 13:30:58 2017 +0100
----------------------------------------------------------------------
.../io/network/buffer/LocalBufferPool.java | 7 ++---
.../io/network/buffer/NetworkBufferPool.java | 27 +++++++++++++-------
.../runtime/taskmanager/TaskManagerTest.java | 2 +-
3 files changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a587997..b485fd1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -58,7 +58,7 @@ class LocalBufferPool implements BufferPool {
* The currently available memory segments. These are segments, which have been requested from
* the network buffer pool and are currently not handed out as Buffer instances.
*/
- private final Queue<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
+ private final ArrayDeque<MemorySegment> availableMemorySegments = new ArrayDeque<MemorySegment>();
/**
* Buffer availability listeners, which need to be notified when a Buffer becomes available.
@@ -297,8 +297,9 @@ class LocalBufferPool implements BufferPool {
@Override
public void setNumBuffers(int numBuffers) throws IOException {
synchronized (availableMemorySegments) {
- checkArgument(numBuffers >= numberOfRequiredMemorySegments, "Buffer pool needs at least " +
- numberOfRequiredMemorySegments + " buffers, but tried to set to " + numBuffers + ".");
+ checkArgument(numBuffers >= numberOfRequiredMemorySegments,
+ "Buffer pool needs at least %s buffers, but tried to set to %s",
+ numberOfRequiredMemorySegments, numBuffers);
if (numBuffers > maxNumberOfMemorySegments) {
currentPoolSize = maxNumberOfMemorySegments;
http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 7668759..5f2da03 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -22,13 +22,13 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashSet;
-import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
@@ -50,7 +50,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
private final int memorySegmentSize;
- private final Queue<MemorySegment> availableMemorySegments;
+ private final ArrayBlockingQueue<MemorySegment> availableMemorySegments;
private volatile boolean isDestroyed;
@@ -124,9 +124,10 @@ public class NetworkBufferPool implements BufferPoolFactory {
return availableMemorySegments.poll();
}
- // This is not safe with regard to destroy calls, but it does not hurt, because destroy happens
- // only once at clean up time (task manager shutdown).
public void recycle(MemorySegment segment) {
+ // Adds the segment back to the queue, which does not immediately free the memory
+ // however, since this happens when references to the global pool are also released,
+ // making the availableMemorySegments queue and its contained object reclaimable
availableMemorySegments.add(segment);
}
@@ -260,7 +261,7 @@ public class NetworkBufferPool implements BufferPoolFactory {
assert Thread.holdsLock(factoryLock);
// All buffers, which are not among the required ones
- int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
+ final int numAvailableMemorySegment = totalNumberOfMemorySegments - numTotalRequiredBuffers;
if (numAvailableMemorySegment == 0) {
// in this case, we need to redistribute buffers so that every pool gets its minimum
@@ -278,7 +279,8 @@ public class NetworkBufferPool implements BufferPoolFactory {
* a ratio that we use to distribute the buffers.
*/
- int totalCapacity = 0;
+ long totalCapacity = 0; // long to avoid int overflow
+
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
bufferPool.getNumberOfRequiredMemorySegments();
@@ -290,9 +292,13 @@ public class NetworkBufferPool implements BufferPoolFactory {
return; // necessary to avoid div by zero when nothing to re-distribute
}
- int memorySegmentsToDistribute = Math.min(numAvailableMemorySegment, totalCapacity);
+ // since one of the arguments of 'min(a,b)' is a positive int, this is actually
+ // guaranteed to be within the 'int' domain
+ // (we use a checked downCast to handle possible bugs more gracefully).
+ final int memorySegmentsToDistribute = MathUtils.checkedDownCast(
+ Math.min(numAvailableMemorySegment, totalCapacity));
- int totalPartsUsed = 0; // of totalCapacity
+ long totalPartsUsed = 0; // of totalCapacity
int numDistributedMemorySegment = 0;
for (LocalBufferPool bufferPool : allBufferPools) {
int excessMax = bufferPool.getMaxNumberOfMemorySegments() -
@@ -307,7 +313,10 @@ public class NetworkBufferPool implements BufferPoolFactory {
// avoid remaining buffers by looking at the total capacity that should have been
// re-distributed up until here
- int mySize = memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment;
+ // the downcast will always succeed, because both arguments of the subtraction are in the 'int' domain
+ final int mySize = MathUtils.checkedDownCast(
+ memorySegmentsToDistribute * totalPartsUsed / totalCapacity - numDistributedMemorySegment);
+
numDistributedMemorySegment += mySize;
bufferPool.setNumBuffers(bufferPool.getNumberOfRequiredMemorySegments() + mySize);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/206ea211/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 0b39e20..730595c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -991,7 +991,7 @@ public class TaskManagerTest extends TestLogger {
config.setInteger(TaskManagerOptions.NETWORK_EXTRA_BUFFERS_PER_GATE, 100);
TaskManagerServicesConfiguration tmConfig =
- TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getByName("localhost"), true);
+ TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLoopbackAddress(), true);
assertEquals(tmConfig.getNetworkConfig().partitionRequestInitialBackoff(), 100);
assertEquals(tmConfig.getNetworkConfig().partitionRequestMaxBackoff(), 200);