You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/23 10:38:30 UTC
[flink] branch master updated: [FLINK-12497][network] Move
ConnectionManager#start arguments to constructor
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0cb56a4 [FLINK-12497][network] Move ConnectionManager#start arguments to constructor
0cb56a4 is described below
commit 0cb56a4dfa9bade080730312ab94b6901c7fb91a
Author: zhijiang <wa...@aliyun.com>
AuthorDate: Thu May 23 18:38:14 2019 +0800
[FLINK-12497][network] Move ConnectionManager#start arguments to constructor
---
.../flink/runtime/io/network/ConnectionManager.java | 3 +--
.../runtime/io/network/LocalConnectionManager.java | 3 +--
.../runtime/io/network/NetworkEnvironment.java | 9 ++++++---
.../io/network/netty/NettyConnectionManager.java | 21 +++++++++++++--------
.../network/netty/NettyConnectionManagerTest.java | 20 ++++++++------------
.../io/network/partition/InputChannelTestUtils.java | 3 +--
6 files changed, 30 insertions(+), 29 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index a84cd8a..75f39e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import java.io.IOException;
@@ -29,7 +28,7 @@ import java.io.IOException;
*/
public interface ConnectionManager {
- void start(ResultPartitionProvider partitionProvider, TaskEventPublisher taskEventDispatcher) throws IOException;
+ void start() throws IOException;
/**
* Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 11cab6b..46ca7fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
/**
* A connection manager implementation to bypass setup overhead for task managers running in local
@@ -28,7 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
public class LocalConnectionManager implements ConnectionManager {
@Override
- public void start(ResultPartitionProvider partitionProvider, TaskEventPublisher taskEventPublisher) {
+ public void start() {
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 1f2ee7e..7ee2a20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -120,8 +120,12 @@ public class NetworkEnvironment {
checkNotNull(config);
NettyConfig nettyConfig = config.nettyConfig();
+
+ ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+
ConnectionManager connectionManager = nettyConfig != null ?
- new NettyConnectionManager(nettyConfig, config.isCreditBased()) : new LocalConnectionManager();
+ new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig, config.isCreditBased()) :
+ new LocalConnectionManager();
NetworkBufferPool networkBufferPool = new NetworkBufferPool(
config.numNetworkBuffers(),
@@ -130,7 +134,6 @@ public class NetworkEnvironment {
registerNetworkMetrics(metricGroup, networkBufferPool);
- ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
resultPartitionManager,
ioManager,
@@ -280,7 +283,7 @@ public class NetworkEnvironment {
try {
LOG.debug("Starting network connection manager");
- connectionManager.start(resultPartitionManager, taskEventPublisher);
+ connectionManager.start();
} catch (IOException t) {
throw new IOException("Failed to instantiate network connection manager.", t);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 52e5a82..73d9b11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
public class NettyConnectionManager implements ConnectionManager {
private final NettyServer server;
@@ -35,24 +37,27 @@ public class NettyConnectionManager implements ConnectionManager {
private final PartitionRequestClientFactory partitionRequestClientFactory;
- private final boolean isCreditBased;
+ private final NettyProtocol nettyProtocol;
+
+ public NettyConnectionManager(
+ ResultPartitionProvider partitionProvider,
+ TaskEventPublisher taskEventPublisher,
+ NettyConfig nettyConfig,
+ boolean isCreditBased) {
- public NettyConnectionManager(NettyConfig nettyConfig, boolean isCreditBased) {
this.server = new NettyServer(nettyConfig);
this.client = new NettyClient(nettyConfig);
this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());
this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
- this.isCreditBased = isCreditBased;
+ this.nettyProtocol = new NettyProtocol(checkNotNull(partitionProvider), checkNotNull(taskEventPublisher), isCreditBased);
}
@Override
- public void start(ResultPartitionProvider partitionProvider, TaskEventPublisher taskEventPublisher) throws IOException {
- NettyProtocol partitionRequestProtocol = new NettyProtocol(partitionProvider, taskEventPublisher, isCreditBased);
-
- client.init(partitionRequestProtocol, bufferPool);
- server.init(partitionRequestProtocol, bufferPool);
+ public void start() throws IOException {
+ client.init(nettyProtocol, bufferPool);
+ server.init(nettyProtocol, bufferPool);
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index 5def8f7..f83e411 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.util.NetUtils;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
@@ -33,7 +33,6 @@ import java.lang.reflect.Field;
import java.net.InetAddress;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
/**
* Simple netty connection manager test.
@@ -57,11 +56,8 @@ public class NettyConnectionManagerTest {
numberOfSlots,
new Configuration());
- NettyConnectionManager connectionManager = new NettyConnectionManager(config, true);
-
- connectionManager.start(
- mock(ResultPartitionProvider.class),
- mock(TaskEventDispatcher.class));
+ NettyConnectionManager connectionManager = createNettyConnectionManager(config);
+ connectionManager.start();
assertEquals(numberOfSlots, connectionManager.getBufferPool().getNumberOfArenas());
@@ -125,11 +121,8 @@ public class NettyConnectionManagerTest {
1337,
flinkConfig);
- NettyConnectionManager connectionManager = new NettyConnectionManager(config, true);
-
- connectionManager.start(
- mock(ResultPartitionProvider.class),
- mock(TaskEventDispatcher.class));
+ NettyConnectionManager connectionManager = createNettyConnectionManager(config);
+ connectionManager.start();
assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas());
@@ -170,4 +163,7 @@ public class NettyConnectionManagerTest {
}
}
+ private NettyConnectionManager createNettyConnectionManager(NettyConfig config) {
+ return new NettyConnectionManager(new ResultPartitionManager(), new TaskEventDispatcher(), config, true);
+ }
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 49981ce..ece1009 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
@@ -141,7 +140,7 @@ public class InputChannelTestUtils {
public static ConnectionManager mockConnectionManagerWithPartitionRequestClient(PartitionRequestClient client) {
return new ConnectionManager() {
@Override
- public void start(ResultPartitionProvider partitionProvider, TaskEventPublisher taskEventDispatcher) {
+ public void start() {
}
@Override