You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/23 13:23:58 UTC

[GitHub] pnowojski closed pull request #6891: [FLINK-10606][network][test] Construct NetworkEnvironment simple for tests

pnowojski closed pull request #6891: [FLINK-10606][network][test] Construct NetworkEnvironment simple for tests
URL: https://github.com/apache/flink/pull/6891
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index f2547563872..1363175e96e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -90,19 +90,43 @@
 	private boolean isShutdown;
 
 	public NetworkEnvironment(
-			NetworkBufferPool networkBufferPool,
-			ConnectionManager connectionManager,
-			ResultPartitionManager resultPartitionManager,
-			TaskEventDispatcher taskEventDispatcher,
-			KvStateRegistry kvStateRegistry,
-			KvStateServer kvStateServer,
-			KvStateClientProxy kvStateClientProxy,
-			IOMode defaultIOMode,
-			int partitionRequestInitialBackoff,
-			int partitionRequestMaxBackoff,
-			int networkBuffersPerChannel,
-			int extraNetworkBuffersPerGate,
-			boolean enableCreditBased) {
+		int numBuffers,
+		int memorySegmentSize,
+		int partitionRequestInitialBackoff,
+		int partitionRequestMaxBackoff,
+		int networkBuffersPerChannel,
+		int extraNetworkBuffersPerGate,
+		boolean enableCreditBased) {
+		this(
+			new NetworkBufferPool(numBuffers, memorySegmentSize),
+			new LocalConnectionManager(),
+			new ResultPartitionManager(),
+			new TaskEventDispatcher(),
+			new KvStateRegistry(),
+			null,
+			null,
+			IOManager.IOMode.SYNC,
+			partitionRequestInitialBackoff,
+			partitionRequestMaxBackoff,
+			networkBuffersPerChannel,
+			extraNetworkBuffersPerGate,
+			enableCreditBased);
+	}
+
+	public NetworkEnvironment(
+		NetworkBufferPool networkBufferPool,
+		ConnectionManager connectionManager,
+		ResultPartitionManager resultPartitionManager,
+		TaskEventDispatcher taskEventDispatcher,
+		KvStateRegistry kvStateRegistry,
+		KvStateServer kvStateServer,
+		KvStateClientProxy kvStateClientProxy,
+		IOMode defaultIOMode,
+		int partitionRequestInitialBackoff,
+		int partitionRequestMaxBackoff,
+		int networkBuffersPerChannel,
+		int extraNetworkBuffersPerGate,
+		boolean enableCreditBased) {
 
 		this.networkBufferPool = checkNotNull(networkBufferPool);
 		this.connectionManager = checkNotNull(connectionManager);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index f0f1926b008..8c2fb7a15f0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -21,7 +21,6 @@
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -31,7 +30,6 @@
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
@@ -79,21 +77,8 @@
 	 */
 	@Test
 	public void testRegisterTaskUsesBoundedBuffers() throws Exception {
-
 		final NetworkEnvironment network = new NetworkEnvironment(
-			new NetworkBufferPool(numBuffers, memorySegmentSize),
-			new LocalConnectionManager(),
-			new ResultPartitionManager(),
-			new TaskEventDispatcher(),
-			new KvStateRegistry(),
-			null,
-			null,
-			IOManager.IOMode.SYNC,
-			0,
-			0,
-			2,
-			8,
-			enableCreditBasedFlowControl);
+			numBuffers, memorySegmentSize, 0, 0, 2, 8, enableCreditBasedFlowControl);
 
 		// result partitions
 		ResultPartition rp1 = createResultPartition(ResultPartitionType.PIPELINED, 2);
@@ -197,19 +182,7 @@ public void testRegisterTaskWithInsufficientBuffers() throws Exception {
 
 	private void testRegisterTaskWithLimitedBuffers(int bufferPoolSize) throws Exception {
 		final NetworkEnvironment network = new NetworkEnvironment(
-			new NetworkBufferPool(bufferPoolSize, memorySegmentSize),
-			new LocalConnectionManager(),
-			new ResultPartitionManager(),
-			new TaskEventDispatcher(),
-			new KvStateRegistry(),
-			null,
-			null,
-			IOManager.IOMode.SYNC,
-			0,
-			0,
-			2,
-			8,
-			enableCreditBasedFlowControl);
+			bufferPoolSize, memorySegmentSize, 0, 0, 2, 8, enableCreditBasedFlowControl);
 
 		final ConnectionManager connManager = createDummyConnectionManager();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 4bf5b220be8..63f18551130 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -25,7 +25,6 @@
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
@@ -45,7 +44,6 @@
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
 import org.junit.Test;
@@ -71,7 +69,6 @@
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -344,7 +341,8 @@ public void testRequestBackoffConfiguration() throws Exception {
 		int initialBackoff = 137;
 		int maxBackoff = 1001;
 
-		final NetworkEnvironment netEnv = createNetworkEnvironment(2, 8, initialBackoff, maxBackoff);
+		final NetworkEnvironment netEnv = new NetworkEnvironment(
+			100, 32, initialBackoff, maxBackoff, 2, 8, enableCreditBasedFlowControl);
 
 		SingleInputGate gate = SingleInputGate.create(
 			"TestTask",
@@ -403,8 +401,8 @@ public void testRequestBuffersWithRemoteInputChannel() throws Exception {
 		final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
 		int buffersPerChannel = 2;
 		int extraNetworkBuffersPerGate = 8;
-		final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel,
-			extraNetworkBuffersPerGate, 0, 0);
+		final NetworkEnvironment network = new NetworkEnvironment(
+			100, 32, 0, 0, buffersPerChannel, extraNetworkBuffersPerGate, enableCreditBasedFlowControl);
 
 		try {
 			final ResultPartitionID resultPartitionId = new ResultPartitionID();
@@ -415,8 +413,6 @@ public void testRequestBuffersWithRemoteInputChannel() throws Exception {
 
 			NetworkBufferPool bufferPool = network.getNetworkBufferPool();
 			if (enableCreditBasedFlowControl) {
-				verify(bufferPool,
-					times(1)).requestMemorySegments(buffersPerChannel);
 				RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels()
 					.get(resultPartitionId.getPartitionId());
 				// only the exclusive buffers should be assigned/available now
@@ -444,7 +440,8 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception {
 		final SingleInputGate inputGate = createInputGate(1, ResultPartitionType.PIPELINED_BOUNDED);
 		int buffersPerChannel = 2;
 		int extraNetworkBuffersPerGate = 8;
-		final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel, extraNetworkBuffersPerGate, 0, 0);
+		final NetworkEnvironment network = new NetworkEnvironment(
+			100, 32, 0, 0, buffersPerChannel, extraNetworkBuffersPerGate, enableCreditBasedFlowControl);
 
 		try {
 			final ResultPartitionID resultPartitionId = new ResultPartitionID();
@@ -454,8 +451,6 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception {
 			NetworkBufferPool bufferPool = network.getNetworkBufferPool();
 
 			if (enableCreditBasedFlowControl) {
-				verify(bufferPool, times(0)).requestMemorySegments(buffersPerChannel);
-
 				assertEquals(bufferPool.getTotalNumberOfMemorySegments(),
 					bufferPool.getNumberOfAvailableMemorySegments());
 				// note: exclusive buffers are not handed out into LocalBufferPool and are thus not counted
@@ -471,8 +466,6 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception {
 				ResultPartitionLocation.createRemote(connectionId)));
 
 			if (enableCreditBasedFlowControl) {
-				verify(bufferPool,
-					times(1)).requestMemorySegments(buffersPerChannel);
 				RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels()
 					.get(resultPartitionId.getPartitionId());
 				// only the exclusive buffers should be assigned/available now
@@ -499,7 +492,8 @@ public void testRequestBuffersWithUnknownInputChannel() throws Exception {
 	public void testUpdateUnknownInputChannel() throws Exception {
 		final SingleInputGate inputGate = createInputGate(2);
 		int buffersPerChannel = 2;
-		final NetworkEnvironment network = createNetworkEnvironment(buffersPerChannel, 8, 0, 0);
+		final NetworkEnvironment network = new NetworkEnvironment(
+			100, 32, 0, 0, buffersPerChannel, 8, enableCreditBasedFlowControl);
 
 		try {
 			final ResultPartitionID localResultPartitionId = new ResultPartitionID();
@@ -543,27 +537,6 @@ public void testUpdateUnknownInputChannel() throws Exception {
 
 	// ---------------------------------------------------------------------------------------------
 
-	private NetworkEnvironment createNetworkEnvironment(
-			int buffersPerChannel,
-			int extraNetworkBuffersPerGate,
-			int initialBackoff,
-			int maxBackoff) {
-		return new NetworkEnvironment(
-			spy(new NetworkBufferPool(100, 32)),
-			new LocalConnectionManager(),
-			new ResultPartitionManager(),
-			new TaskEventDispatcher(),
-			new KvStateRegistry(),
-			null,
-			null,
-			IOManager.IOMode.SYNC,
-			initialBackoff,
-			maxBackoff,
-			buffersPerChannel,
-			extraNetworkBuffersPerGate,
-			enableCreditBasedFlowControl);
-	}
-
 	private SingleInputGate createInputGate() {
 		return createInputGate(2);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 9669513a111..c3118c91f9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -33,11 +33,7 @@
 import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -45,7 +41,6 @@
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -146,14 +141,8 @@ public void testComponentsStartupShutdown() throws Exception {
 			final MemoryManager memManager = new MemoryManager(networkBufNum * BUFFER_SIZE, 1, BUFFER_SIZE, MemoryType.HEAP, false);
 			final IOManager ioManager = new IOManagerAsync(TMP_DIR);
 			final NetworkEnvironment network = new NetworkEnvironment(
-				new NetworkBufferPool(32, netConf.networkBufferSize()),
-				new LocalConnectionManager(),
-				new ResultPartitionManager(),
-				new TaskEventDispatcher(),
-				new KvStateRegistry(),
-				null,
-				null,
-				netConf.ioMode(),
+				32,
+				netConf.networkBufferSize(),
 				netConf.partitionRequestInitialBackoff(),
 				netConf.partitionRequestMaxBackoff(),
 				netConf.networkBuffersPerChannel(),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services