You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ga...@apache.org on 2019/05/31 13:12:28 UTC

[flink] 01/02: [FLINK-12571][network] Make NetworkEnvironment#start() return the binded data port

This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bfd53e9d3f5a221bca8ca82e5f2ab5399d3fa0fd
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Tue May 21 17:15:02 2019 +0800

    [FLINK-12571][network] Make NetworkEnvironment#start() return the binded data port
    
    NetworkEnvironment#getConnectionManager is currently used for getting binded data port from ConnectionManager. Considering the general shuffle service architecture,
    the internal ConnectionManager in NetworkEnvironment should not be exposed to outsides. We could make ShuffleService#start return the binded data port directly if
    exists, then for other cases it could return a default int value which seems no harm.
    
    This closes #8496.
---
 .../apache/flink/runtime/io/network/ConnectionManager.java |  9 ++++++---
 .../flink/runtime/io/network/LocalConnectionManager.java   |  8 ++------
 .../flink/runtime/io/network/NetworkEnvironment.java       | 10 ++++++++--
 .../runtime/io/network/netty/NettyConnectionManager.java   | 14 +++-----------
 .../apache/flink/runtime/io/network/netty/NettyServer.java |  8 +++-----
 .../flink/runtime/taskexecutor/TaskManagerServices.java    |  4 ++--
 .../flink/runtime/io/network/TestingConnectionManager.java |  9 +++------
 .../io/network/partition/InputChannelTestUtils.java        |  8 ++------
 .../io/benchmark/StreamNetworkBenchmarkEnvironment.java    |  6 ++++--
 9 files changed, 33 insertions(+), 43 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
index c342750..32ec02b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -26,7 +26,12 @@ import java.io.IOException;
  */
 public interface ConnectionManager {
 
-	void start() throws IOException;
+	/**
+	 * Starts the internal related components for network connection and communication.
+	 *
+	 * @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible.
+	 */
+	int start() throws IOException;
 
 	/**
 	 * Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.
@@ -40,8 +45,6 @@ public interface ConnectionManager {
 
 	int getNumberOfActiveConnections();
 
-	int getDataPort();
-
 	void shutdown() throws IOException;
 
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 319a9ea..5613d19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -25,7 +25,8 @@ package org.apache.flink.runtime.io.network;
 public class LocalConnectionManager implements ConnectionManager {
 
 	@Override
-	public void start() {
+	public int start() {
+		return -1;
 	}
 
 	@Override
@@ -42,10 +43,5 @@ public class LocalConnectionManager implements ConnectionManager {
 	}
 
 	@Override
-	public int getDataPort() {
-		return -1;
-	}
-
-	@Override
 	public void shutdown() {}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 43969e2..561909c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -187,6 +187,7 @@ public class NetworkEnvironment {
 		return resultPartitionManager;
 	}
 
+	@VisibleForTesting
 	public ConnectionManager getConnectionManager() {
 		return connectionManager;
 	}
@@ -317,7 +318,12 @@ public class NetworkEnvironment {
 		return true;
 	}
 
-	public void start() throws IOException {
+	/*
+	 * Starts the internal related components for network connection and communication.
+	 *
+	 * @return a port to connect to the task executor for shuffle data exchange, -1 if only local connection is possible.
+	 */
+	public int start() throws IOException {
 		synchronized (lock) {
 			Preconditions.checkState(!isShutdown, "The NetworkEnvironment has already been shut down.");
 
@@ -325,7 +331,7 @@ public class NetworkEnvironment {
 
 			try {
 				LOG.debug("Starting network connection manager");
-				connectionManager.start();
+				return connectionManager.start();
 			} catch (IOException t) {
 				throw new IOException("Failed to instantiate network connection manager.", t);
 			}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index ef3db13..3e6a932 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -56,9 +56,10 @@ public class NettyConnectionManager implements ConnectionManager {
 	}
 
 	@Override
-	public void start() throws IOException {
+	public int start() throws IOException {
 		client.init(nettyProtocol, bufferPool);
-		server.init(nettyProtocol, bufferPool);
+
+		return server.init(nettyProtocol, bufferPool);
 	}
 
 	@Override
@@ -78,15 +79,6 @@ public class NettyConnectionManager implements ConnectionManager {
 	}
 
 	@Override
-	public int getDataPort() {
-		if (server != null && server.getLocalAddress() != null) {
-			return server.getLocalAddress().getPort();
-		} else {
-			return -1;
-		}
-	}
-
-	@Override
 	public void shutdown() {
 		client.shutdown();
 		server.shutdown();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
index f818ff6..8bbda5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
@@ -64,7 +64,7 @@ class NettyServer {
 		localAddress = null;
 	}
 
-	void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws IOException {
+	int init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws IOException {
 		checkState(bootstrap == null, "Netty server has already been initialized.");
 
 		final long start = System.nanoTime();
@@ -164,6 +164,8 @@ class NettyServer {
 
 		final long duration = (System.nanoTime() - start) / 1_000_000;
 		LOG.info("Successful initialization (took {} ms). Listening on SocketAddress {}.", duration, localAddress);
+
+		return localAddress.getPort();
 	}
 
 	NettyConfig getConfig() {
@@ -174,10 +176,6 @@ class NettyServer {
 		return bootstrap;
 	}
 
-	public InetSocketAddress getLocalAddress() {
-		return localAddress;
-	}
-
 	void shutdown() {
 		final long start = System.nanoTime();
 		if (bindFuture != null) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e19e8fc..6e9b864 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -248,7 +248,7 @@ public class TaskManagerServices {
 
 		final NetworkEnvironment network = NetworkEnvironment.create(
 			taskManagerServicesConfiguration.getNetworkConfig(), taskEventDispatcher, taskManagerMetricGroup, ioManager);
-		network.start();
+		int dataPort = network.start();
 
 		final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
 		kvStateService.start();
@@ -256,7 +256,7 @@ public class TaskManagerServices {
 		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(
 			resourceID,
 			taskManagerServicesConfiguration.getTaskManagerAddress(),
-			network.getConnectionManager().getDataPort());
+			dataPort);
 
 		// this call has to happen strictly after the network stack has been initialized
 		final MemoryManager memoryManager = createMemoryManager(taskManagerServicesConfiguration, freeHeapMemoryWithDefrag, maxJvmHeapMemory);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
index c23b3c2..19203fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/TestingConnectionManager.java
@@ -27,7 +27,9 @@ import java.io.IOException;
 public class TestingConnectionManager implements ConnectionManager {
 
 	@Override
-	public void start() {}
+	public int start() {
+		return -1;
+	}
 
 	@Override
 	public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException {
@@ -43,10 +45,5 @@ public class TestingConnectionManager implements ConnectionManager {
 	}
 
 	@Override
-	public int getDataPort() {
-		return -1;
-	}
-
-	@Override
 	public void shutdown() {}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 4ff472e..16d6cab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -140,7 +140,8 @@ public class InputChannelTestUtils {
 	public static ConnectionManager mockConnectionManagerWithPartitionRequestClient(PartitionRequestClient client) {
 		return new ConnectionManager() {
 			@Override
-			public void start() {
+			public int start() {
+				return -1;
 			}
 
 			@Override
@@ -158,11 +159,6 @@ public class InputChannelTestUtils {
 			}
 
 			@Override
-			public int getDataPort() {
-				return 0;
-			}
-
-			@Override
 			public void shutdown() {
 			}
 		};
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index fccb5db..4b28961 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -89,6 +89,8 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 
 	protected ResultPartitionID[] partitionIds;
 
+	private int dataPort;
+
 	public void setUp(
 			int writers,
 			int channels,
@@ -141,7 +143,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 		ioManager = new IOManagerAsync();
 
 		senderEnv = createNettyNetworkEnvironment(senderBufferPoolSize, config);
-		senderEnv.start();
+		this.dataPort = senderEnv.start();
 		if (localMode && senderBufferPoolSize == receiverBufferPoolSize) {
 			receiverEnv = senderEnv;
 		}
@@ -163,7 +165,7 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 		TaskManagerLocation senderLocation = new TaskManagerLocation(
 			ResourceID.generate(),
 			LOCAL_ADDRESS,
-			senderEnv.getConnectionManager().getDataPort());
+			dataPort);
 
 		InputGate receiverGate = createInputGate(
 			dataSetID,