You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/23 13:26:40 UTC

[GitHub] asfgit closed pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool

asfgit closed pull request #6809: [FLINK-10491][network] Pass BufferPoolOwner in the constructor of LocalBufferPool
URL: https://github.com/apache/flink/pull/6809
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index f2547563872..d124456c213 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -43,6 +43,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -90,19 +91,43 @@
 	private boolean isShutdown;
 
 	public NetworkEnvironment(
-			NetworkBufferPool networkBufferPool,
-			ConnectionManager connectionManager,
-			ResultPartitionManager resultPartitionManager,
-			TaskEventDispatcher taskEventDispatcher,
-			KvStateRegistry kvStateRegistry,
-			KvStateServer kvStateServer,
-			KvStateClientProxy kvStateClientProxy,
-			IOMode defaultIOMode,
-			int partitionRequestInitialBackoff,
-			int partitionRequestMaxBackoff,
-			int networkBuffersPerChannel,
-			int extraNetworkBuffersPerGate,
-			boolean enableCreditBased) {
+		int numBuffers,
+		int memorySegmentSize,
+		int partitionRequestInitialBackoff,
+		int partitionRequestMaxBackoff,
+		int networkBuffersPerChannel,
+		int extraNetworkBuffersPerGate,
+		boolean enableCreditBased) {
+		this(
+			new NetworkBufferPool(numBuffers, memorySegmentSize),
+			new LocalConnectionManager(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new KvStateRegistry(),
+			null,
+			null,
+			IOManager.IOMode.SYNC,
+			partitionRequestInitialBackoff,
+			partitionRequestMaxBackoff,
+			networkBuffersPerChannel,
+			extraNetworkBuffersPerGate,
+			enableCreditBased);
+	}
+
+	public NetworkEnvironment(
+		NetworkBufferPool networkBufferPool,
+		ConnectionManager connectionManager,
+		ResultPartitionManager resultPartitionManager,
+		TaskEventDispatcher taskEventDispatcher,
+		KvStateRegistry kvStateRegistry,
+		KvStateServer kvStateServer,
+		KvStateClientProxy kvStateClientProxy,
+		IOMode defaultIOMode,
+		int partitionRequestInitialBackoff,
+		int partitionRequestMaxBackoff,
+		int networkBuffersPerChannel,
+		int extraNetworkBuffersPerGate,
+		boolean enableCreditBased) {
 
 		this.networkBufferPool = checkNotNull(networkBufferPool);
 		this.connectionManager = checkNotNull(connectionManager);
@@ -209,8 +234,12 @@ public void setupPartition(ResultPartition partition) throws IOException {
 			int maxNumberOfMemorySegments = partition.getPartitionType().isBounded() ?
 				partition.getNumberOfSubpartitions() * networkBuffersPerChannel +
 					extraNetworkBuffersPerGate : Integer.MAX_VALUE;
+			// If the partition type is back pressure-free, we register with the buffer pool for
+			// callbacks to release memory.
 			bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfSubpartitions(),
-				maxNumberOfMemorySegments);
+				maxNumberOfMemorySegments,
+				partition.getPartitionType().hasBackPressure() ? Optional.empty() : Optional.of(partition));
+
 			partition.registerBufferPool(bufferPool);
 
 			resultPartitionManager.registerResultPartition(partition);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
index 2927fae1069..511eb65019e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPool.java
@@ -25,12 +25,6 @@
  */
 public interface BufferPool extends BufferProvider, BufferRecycler {
 
-	/**
-	 * The owner of this buffer pool to be called when memory needs to be released to avoid back
-	 * pressure.
-	 */
-	void setBufferPoolOwner(BufferPoolOwner owner);
-
 	/**
 	 * Destroys this buffer pool.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
index c90e3025237..de4c8e09e11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import java.io.IOException;
+import java.util.Optional;
 
 /**
  * A factory for buffer pools.
@@ -38,6 +39,21 @@
 	 */
 	BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException;
 
+	/**
+	 * Tries to create a buffer pool with an optional owner, which is guaranteed to provide at least the
+	 * number of required buffers.
+	 *
+	 * <p>The buffer pool is of dynamic size with at least <tt>numRequiredBuffers</tt> buffers.
+	 *
+	 * @param numRequiredBuffers
+	 * 		minimum number of network buffers in this pool
+	 * @param maxUsedBuffers
+	 * 		maximum number of network buffers this pool offers
+	 * 	@param owner
+	 * 	    the optional owner of this buffer pool to release memory when needed
+	 */
+	BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers, Optional<BufferPoolOwner> owner) throws IOException;
+
 	/**
 	 * Destroy callback for updating factory book keeping.
 	 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 1596fded6f3..d7bfb603eaf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -26,10 +26,9 @@
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A buffer pool used to manage a number of {@link Buffer} instances from the
@@ -86,7 +85,7 @@
 
 	private boolean isDestroyed;
 
-	private BufferPoolOwner owner;
+	private final Optional<BufferPoolOwner> owner;
 
 	/**
 	 * Local buffer pool based on the given <tt>networkBufferPool</tt> with a minimal number of
@@ -98,7 +97,7 @@
 	 * 		minimum number of network buffers
 	 */
 	LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments) {
-		this(networkBufferPool, numberOfRequiredMemorySegments, Integer.MAX_VALUE);
+		this(networkBufferPool, numberOfRequiredMemorySegments, Integer.MAX_VALUE, Optional.empty());
 	}
 
 	/**
@@ -114,6 +113,27 @@
 	 */
 	LocalBufferPool(NetworkBufferPool networkBufferPool, int numberOfRequiredMemorySegments,
 			int maxNumberOfMemorySegments) {
+		this(networkBufferPool, numberOfRequiredMemorySegments, maxNumberOfMemorySegments, Optional.empty());
+	}
+
+	/**
+	 * Local buffer pool based on the given <tt>networkBufferPool</tt> and <tt>bufferPoolOwner</tt>
+	 * with a minimal and maximal number of network buffers being available.
+	 *
+	 * @param networkBufferPool
+	 * 		global network buffer pool to get buffers from
+	 * @param numberOfRequiredMemorySegments
+	 * 		minimum number of network buffers
+	 * @param maxNumberOfMemorySegments
+	 * 		maximum number of network buffers to allocate
+	 * 	@param owner
+	 * 		the optional owner of this buffer pool to release memory when needed
+	 */
+	LocalBufferPool(
+		NetworkBufferPool networkBufferPool,
+		int numberOfRequiredMemorySegments,
+		int maxNumberOfMemorySegments,
+		Optional<BufferPoolOwner> owner) {
 		checkArgument(maxNumberOfMemorySegments >= numberOfRequiredMemorySegments,
 			"Maximum number of memory segments (%s) should not be smaller than minimum (%s).",
 			maxNumberOfMemorySegments, numberOfRequiredMemorySegments);
@@ -129,6 +149,7 @@
 		this.numberOfRequiredMemorySegments = numberOfRequiredMemorySegments;
 		this.currentPoolSize = numberOfRequiredMemorySegments;
 		this.maxNumberOfMemorySegments = maxNumberOfMemorySegments;
+		this.owner = owner;
 	}
 
 	// ------------------------------------------------------------------------
@@ -176,14 +197,6 @@ public int bestEffortGetNumOfUsedBuffers() {
 		return Math.max(0, numberOfRequestedMemorySegments - availableMemorySegments.size());
 	}
 
-	@Override
-	public void setBufferPoolOwner(BufferPoolOwner owner) {
-		synchronized (availableMemorySegments) {
-			checkState(this.owner == null, "Buffer pool owner has already been set.");
-			this.owner = checkNotNull(owner);
-		}
-	}
-
 	@Override
 	public Buffer requestBuffer() throws IOException {
 		try {
@@ -222,7 +235,7 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
 		synchronized (availableMemorySegments) {
 			returnExcessMemorySegments();
 
-			boolean askToRecycle = owner != null;
+			boolean askToRecycle = owner.isPresent();
 
 			// fill availableMemorySegments with at least one element, wait if required
 			while (availableMemorySegments.isEmpty()) {
@@ -240,7 +253,7 @@ private MemorySegment requestMemorySegment(boolean isBlocking) throws Interrupte
 				}
 
 				if (askToRecycle) {
-					owner.releaseMemory(1);
+					owner.get().releaseMemory(1);
 				}
 
 				if (isBlocking) {
@@ -336,6 +349,7 @@ public boolean addBufferListener(BufferListener listener) {
 
 	@Override
 	public void setNumBuffers(int numBuffers) throws IOException {
+		int numExcessBuffers;
 		synchronized (availableMemorySegments) {
 			checkArgument(numBuffers >= numberOfRequiredMemorySegments,
 					"Buffer pool needs at least %s buffers, but tried to set to %s",
@@ -349,11 +363,13 @@ public void setNumBuffers(int numBuffers) throws IOException {
 
 			returnExcessMemorySegments();
 
-			// If there is a registered owner and we have still requested more buffers than our
-			// size, trigger a recycle via the owner.
-			if (owner != null && numberOfRequestedMemorySegments > currentPoolSize) {
-				owner.releaseMemory(numberOfRequestedMemorySegments - currentPoolSize);
-			}
+			numExcessBuffers = numberOfRequestedMemorySegments - currentPoolSize;
+		}
+
+		// If there is a registered owner and we have still requested more buffers than our
+		// size, trigger a recycle via the owner.
+		if (owner.isPresent() && numExcessBuffers > 0) {
+			owner.get().releaseMemory(numExcessBuffers);
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index 1fddb612781..48ce27e90b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -33,6 +33,7 @@
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -253,6 +254,11 @@ public int countBuffers() {
 
 	@Override
 	public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) throws IOException {
+		return createBufferPool(numRequiredBuffers, maxUsedBuffers, Optional.empty());
+	}
+
+	@Override
+	public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers, Optional<BufferPoolOwner> owner) throws IOException {
 		// It is necessary to use a separate lock from the one used for buffer
 		// requests to ensure deadlock freedom for failure cases.
 		synchronized (factoryLock) {
@@ -281,7 +287,7 @@ public BufferPool createBufferPool(int numRequiredBuffers, int maxUsedBuffers) t
 			// We are good to go, create a new buffer pool and redistribute
 			// non-fixed size buffers.
 			LocalBufferPool localBufferPool =
-				new LocalBufferPool(this, numRequiredBuffers, maxUsedBuffers);
+				new LocalBufferPool(this, numRequiredBuffers, maxUsedBuffers, owner);
 
 			allBufferPools.add(localBufferPool);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index b32f73f8bcc..85fc5602ef3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -187,12 +187,6 @@ public void registerBufferPool(BufferPool bufferPool) {
 		checkState(this.bufferPool == null, "Bug in result partition setup logic: Already registered buffer pool.");
 
 		this.bufferPool = checkNotNull(bufferPool);
-
-		// If the partition type is back pressure-free, we register with the buffer pool for
-		// callbacks to release memory.
-		if (!partitionType.hasBackPressure()) {
-			bufferPool.setBufferPoolOwner(this);
-		}
 	}
 
 	public JobID getJobId() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index f0f1926b008..8c2fb7a15f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -21,7 +21,6 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -31,7 +30,6 @@
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
@@ -79,21 +77,8 @@
 	 */
 	@Test
 	public void testRegisterTaskUsesBoundedBuffers() throws Exception {
-
 		final NetworkEnvironment network = new NetworkEnvironment(
-			new NetworkBufferPool(numBuffers, memorySegmentSize),
-			new LocalConnectionManager(),
-			new ResultPartitionManager(),
-			new TaskEventDispatcher(),
-			new KvStateRegistry(),
-			null,
-			null,
-			IOManager.IOMode.SYNC,
-			0,
-			0,
-			2,
-			8,
-			enableCreditBasedFlowControl);
+			numBuffers, memorySegmentSize, 0, 0, 2, 8, enableCreditBasedFlowControl);
 
 		// result partitions
 		ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
@@ -197,19 +182,7 @@ public void testRegisterTaskWithInsufficientBuffers() throws Exception {
 
 	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
 		final NetworkEnvironment network = new NetworkEnvironment(
-			new NetworkBufferPool(bufferPoolSize, memorySegmentSize),
-			new LocalConnectionManager(),
-			new ResultPartitionManager(),
-			new TaskEventDispatcher(),
-			new KvStateRegistry(),
-			null,
-			null,
-			IOManager.IOMode.SYNC,
-			0,
-			0,
-			2,
-			8,
-			enableCreditBasedFlowControl);
+			bufferPoolSize, memorySegmentSize, 0, 0, 2, 8, enableCreditBasedFlowControl);
 
 		final ConnectionManager connManager = createDummyConnectionManager();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 8d6be0c4ac8..4e8eec696fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -31,6 +31,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.nullValue;
@@ -323,11 +324,11 @@ public void testRequestMemorySegmentsExceptionDuringBufferRedistribution() throw
 
 		final List<Buffer> buffers = new ArrayList<>(numBuffers);
 		List<MemorySegment> memorySegments = Collections.emptyList();
-		BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers);
-		// make releaseMemory calls always fail:
-		bufferPool.setBufferPoolOwner(numBuffersToRecycle -> {
-			throw new TestIOException();
-		});
+		BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers,
+			// make releaseMemory calls always fail:
+			Optional.of(numBuffersToRecycle -> {
+				throw new TestIOException();
+		}));
 
 		try {
 			// take all but one buffer
@@ -363,11 +364,10 @@ public void testCreateBufferPoolExceptionDuringBufferRedistribution() throws IOE
 		final NetworkBufferPool networkBufferPool = new NetworkBufferPool(numBuffers, 128);
 
 		final List<Buffer> buffers = new ArrayList<>(numBuffers);
-		BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers);
-		bufferPool.setBufferPoolOwner(
-			numBuffersToRecycle -> {
+		BufferPool bufferPool = networkBufferPool.createBufferPool(1, numBuffers,
+			Optional.of(numBuffersToRecycle -> {
 				throw new TestIOException();
-			});
+		}));
 
 		try {
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 1f9bb6bc0e7..ad4cf8ecef5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -21,6 +21,8 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -30,6 +32,7 @@
 import org.junit.Test;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -205,6 +208,54 @@ protected void testAddOnPartition(final ResultPartitionType pipelined)
 		}
 	}
 
+	@Test
+	public void testReleaseMemoryOnBlockingPartition() throws Exception {
+		testReleaseMemory(ResultPartitionType.BLOCKING);
+	}
+
+	@Test
+	public void testReleaseMemoryOnPipelinedPartition() throws Exception {
+		testReleaseMemory(ResultPartitionType.PIPELINED);
+	}
+
+	/**
+	 * Tests {@link ResultPartition#releaseMemory(int)} on a working partition.
+	 *
+	 * @param resultPartitionType the result partition type to set up
+	 */
+	private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception {
+		final int numAllBuffers = 10;
+		final NetworkEnvironment network = new NetworkEnvironment(numAllBuffers, 128, 0, 0, 2, 8, true);
+		final ResultPartitionConsumableNotifier notifier = new NoOpResultPartitionConsumableNotifier();
+		final ResultPartition resultPartition = createPartition(notifier, resultPartitionType, false);
+		try {
+			network.setupPartition(resultPartition);
+
+			// take all buffers (more than the minimum required)
+			for (int i = 0; i < numAllBuffers; ++i) {
+				BufferBuilder bufferBuilder = resultPartition.getBufferPool().requestBufferBuilderBlocking();
+				resultPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
+			}
+			resultPartition.finish();
+
+			assertEquals(0, resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
+
+			// reset the pool size less than the number of requested buffers
+			final int numLocalBuffers = 4;
+			resultPartition.getBufferPool().setNumBuffers(numLocalBuffers);
+
+			// partition with blocking type should release excess buffers
+			if (!resultPartitionType.hasBackPressure()) {
+				assertEquals(numLocalBuffers, resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
+			} else {
+				assertEquals(0, resultPartition.getBufferPool().getNumberOfAvailableMemorySegments());
+			}
+		} finally {
+			resultPartition.release();
+			network.shutdown();
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static ResultPartition createPartition(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 4bf5b220be8..63f18551130 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -25,7 +25,6 @@
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -45,7 +44,6 @@
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
 import org.junit.Test;
@@ -71,7 +69,6 @@
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -344,7 +341,8 @@ public void testRequestBackoffConfiguration() throws Exception {
 		int initialBackoff = 137;
 		int maxBackoff = 1001;
 
-		final NetworkEnvironment netEnv = createNetworkEnvironment(2, 8, initialBackoff, maxBackoff);
+		final NetworkEnvironment netEnv = new NetworkEnvironment(
+			100, 32, initialBackoff, maxBackoff, 2, 8, enableCreditBasedFlowControl);
 
 		SingleInputGate gate = SingleInputGate.create(
 			"TestTask",
@@ -403,8 +401,8 @@ public void testRequestBuffersWithRemoteInputChannel() throws Exception {
 		final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
 		int buffersPerChannel = 2;
 		int extraNetworkBuffersPerGate = 8;
-		final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel,
-			extraNetworkBuffersPerGate, 0, 0);
+		final NetworkEnvironment network = new NetworkEnvironment(
+			100, 32, 0, 0, buffersPerChannel, extraNetworkBuffersPerGate, enableCreditBasedFlowControl);
 
 		try {
 			final ResultPartitionID resultPartitionId = new ResultPartitionID();
@@ -415,8 +413,6 @@ public void testRequestBuffersWithRemoteInputChannel() throws Exception {
 
 			NetworkBufferPool bufferPool = network.getNetworkBufferPool();
 			if (enableCreditBasedFlowControl) {
-				verify(bufferPool,
-					times(1)).requestMemorySegments(buffersPerChannel);
 				RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels()
 					.get(resultPartitionId.getPartitionId());
 				// only the exclusive buffers should be assigned/available now
@@ -444,7 +440,8 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception {
 		final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
 		int buffersPerChannel = 2;
 		int extraNetworkBuffersPerGate = 8;
-		final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel, extraNetworkBuffersPerGate, 0, 0);
+		final NetworkEnvironment network = new NetworkEnvironment(
+			100, 32, 0, 0, buffersPerChannel, extraNetworkBuffersPerGate, enableCreditBasedFlowControl);
 
 		try {
 			final ResultPartitionID resultPartitionId = new ResultPartitionID();
@@ -454,8 +451,6 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception {
 			NetworkBufferPool bufferPool = network.getNetworkBufferPool();
 
 			if (enableCreditBasedFlowControl) {
-				verify(bufferPool, times(0)).requestMemorySegments(buffersPerChannel);
-
 				assertEquals(bufferPool.getTotalNumberOfMemorySegments(),
 					bufferPool.getNumberOfAvailableMemorySegments());
 				// note: exclusive buffers are not handed out into LocalBufferPool and are thus not counted
@@ -471,8 +466,6 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception {
 				ResultPartitionLocation.createRemote(connectionId)));
 
 			if (enableCreditBasedFlowControl) {
-				verify(bufferPool,
-					times(1)).requestMemorySegments(buffersPerChannel);
 				RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels()
 					.get(resultPartitionId.getPartitionId());
 				// only the exclusive buffers should be assigned/available now
@@ -499,7 +492,8 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception {
 	public void testUpdateUnknownInputChannel() throws Exception {
 		final SingleInputGate inputGate = createInputGate(2);
 		int buffersPerChannel = 2;
-		final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel, 8, 0, 0);
+		final NetworkEnvironment network = new NetworkEnvironment(
+			100, 32, 0, 0, buffersPerChannel, 8, enableCreditBasedFlowControl);
 
 		try {
 			final ResultPartitionID localResultPartitionId = new ResultPartitionID();
@@ -543,27 +537,6 @@ public void testUpdateUnknownInputChannel() throws Exception {
 
 	// ---------------------------------------------------------------------------------------------
 
-	private NetworkEnvironment createNetworkEnvironment(
-			int buffersPerChannel,
-			int extraNetworkBuffersPerGate,
-			int initialBackoff,
-			int maxBackoff) {
-		return new NetworkEnvironment(
-			spy(new NetworkBufferPool(100, 32)),
-			new LocalConnectionManager(),
-			new ResultPartitionManager(),
-			new TaskEventDispatcher(),
-			new KvStateRegistry(),
-			null,
-			null,
-			IOManager.IOMode.SYNC,
-			initialBackoff,
-			maxBackoff,
-			buffersPerChannel,
-			extraNetworkBuffersPerGate,
-			enableCreditBasedFlowControl);
-	}
-
 	private SingleInputGate createInputGate() {
 		return createInputGate(2);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 9669513a111..c3118c91f9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -33,11 +33,7 @@
 import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -45,7 +41,6 @@
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -146,14 +141,8 @@ public void testComponentsStartupShutdown() throws Exception {
 			final MemoryManager memManager = new MemoryManager(networkBufNum * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false);
 			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
 			final NetworkEnvironment network = new NetworkEnvironment(
-				new NetworkBufferPool(32, netConf.networkBufferSize()),
-				new LocalConnectionManager(),
-				new ResultPartitionManager(),
-				new TaskEventDispatcher(),
-				new KvStateRegistry(),
-				null,
-				null,
-				netConf.ioMode(),
+				32,
+				netConf.networkBufferSize(),
 				netConf.partitionRequestInitialBackoff(),
 				netConf.partitionRequestMaxBackoff(),
 				netConf.networkBuffersPerChannel(),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services