You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:29 UTC
[flink] 04/10: [hotfix][tests][network] Introduce
NetworkEnvironment.create factory method
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e677bb6dcc38dfb3b5bf8b20e4a2d324b9e20ab5
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Sun May 12 12:55:01 2019 +0200
[hotfix][tests][network] Introduce NetworkEnvironment.create factory method
---
.../runtime/io/network/NetworkEnvironment.java | 47 +++++++++++++++-------
.../runtime/taskexecutor/TaskManagerServices.java | 2 +-
.../io/network/NetworkEnvironmentBuilder.java | 2 +-
3 files changed, 35 insertions(+), 16 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index b9b35c9..ea482f1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -92,31 +92,50 @@ public class NetworkEnvironment {
private boolean isShutdown;
- public NetworkEnvironment(
+ private NetworkEnvironment(
NetworkEnvironmentConfiguration config,
+ NetworkBufferPool networkBufferPool,
+ ConnectionManager connectionManager,
+ ResultPartitionManager resultPartitionManager,
TaskEventPublisher taskEventPublisher,
- MetricGroup metricGroup,
IOManager ioManager) {
- this.config = checkNotNull(config);
+ this.config = config;
+ this.networkBufferPool = networkBufferPool;
+ this.connectionManager = connectionManager;
+ this.resultPartitionManager = resultPartitionManager;
+ this.taskEventPublisher = taskEventPublisher;
+ this.ioManager = ioManager;
+ this.isShutdown = false;
+ }
- this.networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
+ public static NetworkEnvironment create(
+ NetworkEnvironmentConfiguration config,
+ TaskEventPublisher taskEventPublisher,
+ MetricGroup metricGroup,
+ IOManager ioManager) {
+ checkNotNull(ioManager);
+ checkNotNull(taskEventPublisher);
+ checkNotNull(config);
NettyConfig nettyConfig = config.nettyConfig();
- if (nettyConfig != null) {
- this.connectionManager = new NettyConnectionManager(nettyConfig, config.isCreditBased());
- } else {
- this.connectionManager = new LocalConnectionManager();
- }
-
- this.resultPartitionManager = new ResultPartitionManager();
+ ConnectionManager connectionManager = nettyConfig != null ?
+ new NettyConnectionManager(nettyConfig, config.isCreditBased()) : new LocalConnectionManager();
- this.taskEventPublisher = checkNotNull(taskEventPublisher);
+ NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+ config.numNetworkBuffers(),
+ config.networkBufferSize());
registerNetworkMetrics(metricGroup, networkBufferPool);
- this.ioManager = checkNotNull(ioManager);
+ ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
- isShutdown = false;
+ return new NetworkEnvironment(
+ config,
+ networkBufferPool,
+ connectionManager,
+ resultPartitionManager,
+ taskEventPublisher,
+ ioManager);
}
private static void registerNetworkMetrics(MetricGroup metricGroup, NetworkBufferPool networkBufferPool) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e2ec942..e19e8fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -246,7 +246,7 @@ public class TaskManagerServices {
// start the I/O manager, it will create some temp directories.
final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
- final NetworkEnvironment network = new NetworkEnvironment(
+ final NetworkEnvironment network = NetworkEnvironment.create(
taskManagerServicesConfiguration.getNetworkConfig(), taskEventDispatcher, taskManagerMetricGroup, ioManager);
network.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
index 5e529fd..e510be4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentBuilder.java
@@ -110,7 +110,7 @@ public class NetworkEnvironmentBuilder {
}
public NetworkEnvironment build() {
- return new NetworkEnvironment(
+ return NetworkEnvironment.create(
new NetworkEnvironmentConfiguration(
numNetworkBuffers,
networkBufferSize,