You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/23 10:38:30 UTC

[flink] branch master updated: [FLINK-12497][network] Move ConnectionManager#start arguments to constructor

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cb56a4  [FLINK-12497][network] Move ConnectionManager#start arguments to constructor
0cb56a4 is described below

commit 0cb56a4dfa9bade080730312ab94b6901c7fb91a
Author: zhijiang <wa...@aliyun.com>
AuthorDate: Thu May 23 18:38:14 2019 +0800

    [FLINK-12497][network] Move ConnectionManager#start arguments to constructor
---
 .../flink/runtime/io/network/ConnectionManager.java |  3 +--
 .../runtime/io/network/LocalConnectionManager.java  |  3 +--
 .../runtime/io/network/NetworkEnvironment.java      |  9 ++++++---
 .../io/network/netty/NettyConnectionManager.java    | 21 +++++++++++++--------
 .../network/netty/NettyConnectionManagerTest.java   | 20 ++++++++------------
 .../io/network/partition/InputChannelTestUtils.java |  3 +--
 6 files changed, 30 insertions(+), 29 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index a84cd8a..75f39e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 import java.io.IOException;
 
@@ -29,7 +28,7 @@ import java.io.IOException;
  */
 public interface ConnectionManager {
 
-	void start(ResultPartitionProvider partitionProvider, TaskEventPublisher taskEventDispatcher) throws IOException;
+	void start() throws IOException;
 
 	/**
 	 * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 11cab6b..46ca7fc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 /**
  * A connection manager implementation to bypass setup overhead for task managers running in local
@@ -28,7 +27,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 public class LocalConnectionManager implements ConnectionManager {
 
 	@Override
-	public void start(ResultPartitionProvider partitionProvider, TaskEventPublisher taskEventPublisher) {
+	public void start() {
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 1f2ee7e..7ee2a20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -120,8 +120,12 @@ public class NetworkEnvironment {
 		checkNotNull(config);
 
 		NettyConfig nettyConfig = config.nettyConfig();
+
+		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
+
 		ConnectionManager connectionManager = nettyConfig != null ?
-			new NettyConnectionManager(nettyConfig, config.isCreditBased()) : new LocalConnectionManager();
+			new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig, config.isCreditBased()) :
+			new LocalConnectionManager();
 
 		NetworkBufferPool networkBufferPool = new NetworkBufferPool(
 			config.numNetworkBuffers(),
@@ -130,7 +134,6 @@ public class NetworkEnvironment {
 
 		registerNetworkMetrics(metricGroup, networkBufferPool);
 
-		ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
 		ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
 			resultPartitionManager,
 			ioManager,
@@ -280,7 +283,7 @@ public class NetworkEnvironment {
 
 			try {
 				LOG.debug("Starting network connection manager");
-				connectionManager.start(resultPartitionManager, taskEventPublisher);
+				connectionManager.start();
 			} catch (IOException t) {
 				throw new IOException("Failed to instantiate network connection manager.", t);
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 52e5a82..73d9b11 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 
 import java.io.IOException;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 public class NettyConnectionManager implements ConnectionManager {
 
 	private final NettyServer server;
@@ -35,24 +37,27 @@ public class NettyConnectionManager implements ConnectionManager {
 
 	private final PartitionRequestClientFactory partitionRequestClientFactory;
 
-	private final boolean isCreditBased;
+	private final NettyProtocol nettyProtocol;
+
+	public NettyConnectionManager(
+		ResultPartitionProvider partitionProvider,
+		TaskEventPublisher taskEventPublisher,
+		NettyConfig nettyConfig,
+		boolean isCreditBased) {
 
-	public NettyConnectionManager(NettyConfig nettyConfig, boolean isCreditBased) {
 		this.server = new NettyServer(nettyConfig);
 		this.client = new NettyClient(nettyConfig);
 		this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());
 
 		this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);
 
-		this.isCreditBased = isCreditBased;
+		this.nettyProtocol = new NettyProtocol(checkNotNull(partitionProvider), checkNotNull(taskEventPublisher), isCreditBased);
 	}
 
 	@Override
-	public void start(ResultPartitionProvider partitionProvider, TaskEventPublisher taskEventPublisher) throws IOException {
-		NettyProtocol partitionRequestProtocol = new NettyProtocol(partitionProvider, taskEventPublisher, isCreditBased);
-
-		client.init(partitionRequestProtocol, bufferPool);
-		server.init(partitionRequestProtocol, bufferPool);
+	public void start() throws IOException {
+		client.init(nettyProtocol, bufferPool);
+		server.init(nettyProtocol, bufferPool);
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index 5def8f7..f83e411 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
@@ -33,7 +33,6 @@ import java.lang.reflect.Field;
 import java.net.InetAddress;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
 
 /**
  * Simple netty connection manager test.
@@ -57,11 +56,8 @@ public class NettyConnectionManagerTest {
 				numberOfSlots,
 				new Configuration());
 
-		NettyConnectionManager connectionManager = new NettyConnectionManager(config, true);
-
-		connectionManager.start(
-				mock(ResultPartitionProvider.class),
-				mock(TaskEventDispatcher.class));
+		NettyConnectionManager connectionManager = createNettyConnectionManager(config);
+		connectionManager.start();
 
 		assertEquals(numberOfSlots, connectionManager.getBufferPool().getNumberOfArenas());
 
@@ -125,11 +121,8 @@ public class NettyConnectionManagerTest {
 				1337,
 				flinkConfig);
 
-		NettyConnectionManager connectionManager = new NettyConnectionManager(config, true);
-
-		connectionManager.start(
-				mock(ResultPartitionProvider.class),
-				mock(TaskEventDispatcher.class));
+		NettyConnectionManager connectionManager = createNettyConnectionManager(config);
+		connectionManager.start();
 
 		assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas());
 
@@ -170,4 +163,7 @@ public class NettyConnectionManagerTest {
 		}
 	}
 
+	private NettyConnectionManager createNettyConnectionManager(NettyConfig config) {
+		return new NettyConnectionManager(new ResultPartitionManager(), new TaskEventDispatcher(), config, true);
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 49981ce..ece1009 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -22,7 +22,6 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.TaskEventPublisher;
 import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
@@ -141,7 +140,7 @@ public class InputChannelTestUtils {
 	public static ConnectionManager mockConnectionManagerWithPartitionRequestClient(PartitionRequestClient client) {
 		return new ConnectionManager() {
 			@Override
-			public void start(ResultPartitionProvider partitionProvider, TaskEventPublisher taskEventDispatcher) {
+			public void start() {
 			}
 
 			@Override