You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:29 UTC

[flink] 04/10: [hotfix][tests][network] Introduce NetworkEnvironment.create factory method

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e677bb6dcc38dfb3b5bf8b20e4a2d324b9e20ab5
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Sun May 12 12:55:01 2019 +0200

    [hotfix][tests][network] Introduce NetworkEnvironment.create factory method
---
 .../runtime/io/network/NetworkEnvironment.java     | 47 +++++++++++++++-------
 .../runtime/taskexecutor/TaskManagerServices.java  |  2 +-
 .../io/network/NetworkEnvironmentBuilder.java      |  2 +-
 3 files changed, 35 insertions(+), 16 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index b9b35c9..ea482f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -92,31 +92,50 @@ public class NetworkEnvironment {
 
 	private boolean isShutdown;
 
-	public NetworkEnvironment(
+	private NetworkEnvironment(
 			NetworkEnvironmentConfiguration config,
+			NetworkBufferPool networkBufferPool,
+			ConnectionManager connectionManager,
+			ResultPartitionManager resultPartitionManager,
 			TaskEventPublisher taskEventPublisher,
-			MetricGroup metricGroup,
 			IOManager ioManager) {
-		this.config = checkNotNull(config);
+		this.config = config;
+		this.networkBufferPool = networkBufferPool;
+		this.connectionManager = connectionManager;
+		this.resultPartitionManager = resultPartitionManager;
+		this.taskEventPublisher = taskEventPublisher;
+		this.ioManager = ioManager;
+		this.isShutdown = false;
+	}
 
-		this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
+	public static NetworkEnvironment create(
+			NetworkEnvironmentConfiguration config,
+			TaskEventPublisher taskEventPublisher,
+			MetricGroup metricGroup,
+			IOManager ioManager) {
+		checkNotNull(ioManager);
+		checkNotNull(taskEventPublisher);
+		checkNotNull(config);
 
 		NettyConfig nettyConfig = config.nettyConfig();
-		if (nettyConfig != null) {
-			this.connectionManager = new NettyConnectionManager(nettyConfig, config.isCreditBased());
-		} else {
-			this.connectionManager = new LocalConnectionManager();
-		}
-
-		this.resultPartitionManager = new ResultPartitionManager();
+		ConnectionManager connectionManager = nettyConfig != null ?
+			new NettyConnectionManager(nettyConfig, config.isCreditBased()) : new LocalConnectionManager();
 
-		this.taskEventPublisher = checkNotNull(taskEventPublisher);
+		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+			config.numNetworkBuffers(),
+			config.networkBufferSize());
 
 		registerNetworkMetrics(metricGroup, networkBufferPool);
 
-		this.ioManager = checkNotNull(ioManager);
+		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
 
-		isShutdown = false;
+		return new NetworkEnvironment(
+			config,
+			networkBufferPool,
+			connectionManager,
+			resultPartitionManager,
+			taskEventPublisher,
+			ioManager);
 	}
 
 	private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e2ec942..e19e8fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -246,7 +246,7 @@ public class TaskManagerServices {
 		// start the I/O manager, it will create some temp directories.
 		final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
 
-		final NetworkEnvironment network = new NetworkEnvironment(
+		final NetworkEnvironment network = NetworkEnvironment.create(
 			taskManagerServicesConfiguration.getNetworkConfig(), taskEventDispatcher, taskManagerMetricGroup, ioManager);
 		network.start();
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
index 5e529fd..e510be4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
@@ -110,7 +110,7 @@ public class NetworkEnvironmentBuilder {
 	}
 
 	public NetworkEnvironment build() {
-		return new NetworkEnvironment(
+		return NetworkEnvironment.create(
 			new NetworkEnvironmentConfiguration(
 				numNetworkBuffers,
 				networkBufferSize,