You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/05 14:39:57 UTC
[flink] branch master updated: [FLINK-22821][core] Stabilize NetUtils#getAvailablePort in order to avoid wrongly allocating any used ports
This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dd1fddb [FLINK-22821][core] Stabilize NetUtils#getAvailablePort in order to avoid wrongly allocating any used ports
dd1fddb is described below
commit dd1fddb13b2d08ade580e5b3ec6b8e910974308d
Author: Yao Zhang <xz...@126.com>
AuthorDate: Wed Jun 2 16:17:02 2021 +0800
[FLINK-22821][core] Stabilize NetUtils#getAvailablePort in order to avoid wrongly allocating any used ports
---
.../apache/flink/client/program/ClientTest.java | 15 +-
.../connectors/kafka/KafkaTestEnvironmentImpl.java | 34 +++--
.../main/java/org/apache/flink/util/FileLock.java | 166 +++++++++++++++++++++
.../main/java/org/apache/flink/util/NetUtils.java | 36 ++++-
.../apache/flink/client/python/PythonEnvUtils.java | 4 +-
.../flink/queryablestate/network/ClientTest.java | 4 +-
.../io/network/netty/NettyClientServerSslTest.java | 103 ++++++++-----
.../network/netty/NettyConnectionManagerTest.java | 49 +++---
.../netty/NettyPartitionRequestClientTest.java | 23 +--
.../runtime/io/network/netty/NettyTestUtil.java | 6 +-
.../netty/PartitionRequestClientFactoryTest.java | 3 -
.../taskexecutor/TaskExecutorSubmissionTest.java | 76 +++++-----
.../runtime/taskexecutor/TaskExecutorTest.java | 46 +++---
.../flink/test/runtime/IPv6HostnamesITCase.java | 18 +--
14 files changed, 416 insertions(+), 167 deletions(-)
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 19d8e22..4d8e47d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -56,6 +56,7 @@ import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
+import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
@@ -86,6 +87,8 @@ public class ClientTest extends TestLogger {
private Plan plan;
+ private NetUtils.Port port;
+
private Configuration config;
private static final String TEST_EXECUTOR_NAME = "test_executor";
@@ -102,14 +105,22 @@ public class ClientTest extends TestLogger {
env.generateSequence(1, 1000).output(new DiscardingOutputFormat<>());
plan = env.createProgramPlan();
- final int freePort = NetUtils.getAvailablePort();
config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
- config.setInteger(JobManagerOptions.PORT, freePort);
+ NetUtils.Port port = NetUtils.getAvailablePort();
+ config.setInteger(JobManagerOptions.PORT, port.getPort());
+
config.set(
AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
}
+ @After
+ public void tearDown() throws Exception {
+ if (port != null) {
+ port.close();
+ }
+ }
+
private Configuration fromPackagedProgram(
final PackagedProgram program, final int parallelism, final boolean detached) {
final Configuration configuration = new Configuration();
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 47ca5a7..b601680 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -119,7 +119,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
zookeeper = null;
brokers.clear();
- zookeeper = new TestingServer(-1, tmpZkDir);
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ zookeeper = new TestingServer(port.getPort(), tmpZkDir);
+ }
+
zookeeperConnectionString = zookeeper.getConnectString();
LOG.info(
"Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
@@ -427,22 +430,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
final int numTries = 5;
for (int i = 1; i <= numTries; i++) {
- int kafkaPort = NetUtils.getAvailablePort();
- kafkaProperties.put("port", Integer.toString(kafkaPort));
-
- // to support secure kafka cluster
- if (config.isSecureMode()) {
- LOG.info("Adding Kafka secure configurations");
- kafkaProperties.put(
- "listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
- kafkaProperties.put(
- "advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
- kafkaProperties.putAll(getSecureProperties());
- }
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ int kafkaPort = port.getPort();
+ kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+ // to support secure kafka cluster
+ if (config.isSecureMode()) {
+ LOG.info("Adding Kafka secure configurations");
+ kafkaProperties.put(
+ "listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+ kafkaProperties.put(
+ "advertised.listeners",
+ "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+ kafkaProperties.putAll(getSecureProperties());
+ }
- KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+ KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
- try {
scala.Option<String> stringNone = scala.Option.apply(null);
KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, false);
server.startup();
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileLock.java b/flink-core/src/main/java/org/apache/flink/util/FileLock.java
new file mode 100644
index 0000000..c247c94
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/FileLock.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/** A file lock used for avoiding race condition among multiple threads/processes. */
+@Internal
+public class FileLock {
+ private static final String TEMP_DIR = System.getProperty("java.io.tmpdir");
+ private final File file;
+ private FileOutputStream outputStream;
+ private java.nio.channels.FileLock lock;
+
+ /**
+ * Initialize a FileLock using a file located at fullPath.
+ *
+ * @param fullPath The path of the locking file
+ */
+ public FileLock(String fullPath) {
+ Preconditions.checkNotNull(fullPath, "fullPath should not be null");
+ Path path = Paths.get(fullPath);
+ String normalizedFileName = normalizeFileName(path.getFileName().toString());
+ if (normalizedFileName.isEmpty()) {
+ throw new IllegalArgumentException("There are no legal characters in the file name");
+ }
+ this.file =
+ path.getParent() == null
+ ? new File(normalizedFileName)
+ : new File(path.getParent().toString(), normalizedFileName);
+ }
+
+ /**
+ * Initialize a FileLock using a file located at parentDir/fileName.
+ *
+ * @param parentDir The parent dir of the locking file
+ * @param fileName The name of the locking file
+ */
+ public FileLock(String parentDir, String fileName) {
+ Preconditions.checkNotNull(parentDir, "parentDir should not be null");
+ Preconditions.checkNotNull(fileName, "fileName should not be null");
+ this.file = new File(parentDir, normalizeFileName(fileName));
+ }
+
+ /**
+ * Initialize a FileLock using a file located inside temp folder.
+ *
+ * @param fileName The name of the locking file
+ * @return The initialized FileLock
+ */
+ public static FileLock inTempFolder(String fileName) {
+ return new FileLock(TEMP_DIR, fileName);
+ }
+
+ /**
+ * Check whether the locking file exists in the file system. Create it if it does not exist.
+ * Then create a FileOutputStream for it.
+ *
+ * @throws IOException If the file path is invalid or the parent dir does not exist
+ */
+ private void init() throws IOException {
+ if (!this.file.exists()) {
+ this.file.createNewFile();
+ }
+ outputStream = new FileOutputStream(this.file);
+ }
+
+ /**
+ * Try to acquire a lock on the locking file. This method immediately returns whenever the lock
+ * is acquired or not.
+ *
+ * @return True if successfully acquired the lock
+ * @throws IOException If the file path is invalid
+ */
+ public boolean tryLock() throws IOException {
+ if (outputStream == null) {
+ init();
+ }
+ try {
+ lock = outputStream.getChannel().tryLock();
+ } catch (Exception e) {
+ return false;
+ }
+
+ return lock != null;
+ }
+
+ /**
+ * Release the file lock.
+ *
+ * @throws IOException If the FileChannel is closed
+ */
+ public void unlock() throws IOException {
+ if (lock != null && lock.channel().isOpen()) {
+ lock.release();
+ }
+ }
+
+ /**
+ * Release the file lock, close the fileChannel and FileOutputStream then try deleting the
+ * locking file if other file lock does not need it, which means the lock will not be used
+ * anymore.
+ *
+ * @throws IOException If an I/O error occurs
+ */
+ public void unlockAndDestroy() throws IOException {
+ try {
+ unlock();
+ if (lock != null) {
+ lock.channel().close();
+ lock = null;
+ }
+ if (outputStream != null) {
+ outputStream.close();
+ outputStream = null;
+ }
+
+ } finally {
+ this.file.delete();
+ }
+ }
+
+ /**
+ * Check whether a FileLock is actually holding the lock.
+ *
+ * @return True if it is actually holding the lock
+ */
+ public boolean isValid() {
+ if (lock != null) {
+ return lock.isValid();
+ }
+ return false;
+ }
+
+ /**
+ * Normalize the file name, which only allows slash, backslash, digits and letters.
+ *
+ * @param fileName Original file name
+ * @return File name with illegal characters stripped
+ */
+ private static String normalizeFileName(String fileName) {
+ return fileName.replaceAll("[^\\w/\\\\]", "");
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index 89b11df..5da1fd2 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -132,7 +132,7 @@ public class NetUtils {
* if SO_TIMEOUT is greater than 0, then this method will suppress SocketTimeoutException;
* must not be null; SO_TIMEOUT option must be set to 0
* @return the new Socket
- * @exception IOException see {@link ServerSocket#accept()}
+ * @throws IOException see {@link ServerSocket#accept()}
* @see <a href="https://bugs.openjdk.java.net/browse/JDK-8237858">JDK-8237858</a>
*/
public static Socket acceptWithoutTimeout(ServerSocket serverSocket) throws IOException {
@@ -158,12 +158,17 @@ public class NetUtils {
*
* @return A non-occupied port.
*/
- public static int getAvailablePort() {
+ public static Port getAvailablePort() {
for (int i = 0; i < 50; i++) {
try (ServerSocket serverSocket = new ServerSocket(0)) {
int port = serverSocket.getLocalPort();
if (port != 0) {
- return port;
+ FileLock fileLock = new FileLock(NetUtils.class.getName() + port);
+ if (fileLock.tryLock()) {
+ return new Port(port, fileLock);
+ } else {
+ fileLock.unlockAndDestroy();
+ }
}
} catch (IOException ignored) {
}
@@ -498,4 +503,29 @@ public class NetUtils {
public static boolean isValidHostPort(int port) {
return 0 <= port && port <= 65535;
}
+
+ /**
+ * Port wrapper class which holds a {@link FileLock} until it releases. Used to avoid race
+ * condition among multiple threads/processes.
+ */
+ public static class Port implements AutoCloseable {
+ private final int port;
+ private final FileLock fileLock;
+
+ public Port(int port, FileLock fileLock) throws IOException {
+ Preconditions.checkNotNull(fileLock, "FileLock should not be null");
+ Preconditions.checkState(fileLock.isValid(), "FileLock should be locked");
+ this.port = port;
+ this.fileLock = fileLock;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public void close() throws Exception {
+ fileLock.unlockAndDestroy();
+ }
+ }
}
diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index 348062c..ca7d990 100644
--- a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -362,8 +362,8 @@ final class PythonEnvUtils {
Thread thread =
new Thread(
() -> {
- try {
- int freePort = NetUtils.getAvailablePort();
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ int freePort = port.getPort();
GatewayServer server =
new GatewayServer.GatewayServerBuilder()
.gateway(
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 83c8613..383fe41 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -255,10 +255,10 @@ public class ClientTest extends TestLogger {
Client<KvStateInternalRequest, KvStateResponse> client = null;
- try {
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
client = new Client<>("Test Client", 1, serializer, stats);
- int availablePort = NetUtils.getAvailablePort();
+ int availablePort = port.getPort();
InetSocketAddress serverAddress =
new InetSocketAddress(InetAddress.getLocalHost(), availablePort);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 8a440d2..fd5f490 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -88,21 +88,26 @@ public class NettyClientServerSslTest extends TestLogger {
NettyProtocol protocol = new NoOpProtocol();
- NettyConfig nettyConfig = createNettyConfig(sslConfig);
-
- final NettyBufferPool bufferPool = new NettyBufferPool(1);
- final NettyServer server =
- NettyTestUtil.initServer(
- nettyConfig,
- bufferPool,
- sslHandlerFactory ->
- new TestingServerChannelInitializer(
- protocol,
- sslHandlerFactory,
- serverChannelInitComplete,
- serverSslHandler));
- final NettyClient client = NettyTestUtil.initClient(nettyConfig, protocol, bufferPool);
- final NettyServerAndClient serverAndClient = new NettyServerAndClient(server, client);
+ NettyServerAndClient serverAndClient;
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ NettyConfig nettyConfig = createNettyConfig(sslConfig, port);
+
+ final NettyBufferPool bufferPool = new NettyBufferPool(1);
+ final NettyServer server =
+ NettyTestUtil.initServer(
+ nettyConfig,
+ bufferPool,
+ sslHandlerFactory ->
+ new TestingServerChannelInitializer(
+ protocol,
+ sslHandlerFactory,
+ serverChannelInitComplete,
+ serverSslHandler));
+ final NettyClient client = NettyTestUtil.initClient(nettyConfig, protocol, bufferPool);
+ serverAndClient = new NettyServerAndClient(server, client);
+ }
+ Assert.assertNotNull(
+ "serverAndClient is null due to fail to get a free port", serverAndClient);
Channel ch = NettyTestUtil.connect(serverAndClient);
@@ -175,10 +180,10 @@ public class NettyClientServerSslTest extends TestLogger {
// Modify the keystore password to an incorrect one
config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "invalidpassword");
- NettyConfig nettyConfig = createNettyConfig(config);
-
NettyTestUtil.NettyServerAndClient serverAndClient = null;
- try {
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ NettyConfig nettyConfig = createNettyConfig(config, port);
+
serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
Assert.fail("Created server and client from invalid configuration");
} catch (Exception e) {
@@ -199,11 +204,14 @@ public class NettyClientServerSslTest extends TestLogger {
config.setString(
SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
- NettyConfig nettyConfig = createNettyConfig(config);
-
- NettyTestUtil.NettyServerAndClient serverAndClient =
- NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+ NettyTestUtil.NettyServerAndClient serverAndClient;
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ NettyConfig nettyConfig = createNettyConfig(config, port);
+ serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+ }
+ Assert.assertNotNull(
+ "serverAndClient is null due to fail to get a free port", serverAndClient);
Channel ch = NettyTestUtil.connect(serverAndClient);
ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
@@ -222,17 +230,23 @@ public class NettyClientServerSslTest extends TestLogger {
clientConfig.setString(
SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
- final NettyConfig nettyServerConfig = createNettyConfig(serverConfig);
- final NettyConfig nettyClientConfig = createNettyConfig(clientConfig);
+ NettyServerAndClient serverAndClient;
+ try (NetUtils.Port serverPort = NetUtils.getAvailablePort();
+ NetUtils.Port clientPort = NetUtils.getAvailablePort()) {
+ final NettyConfig nettyServerConfig = createNettyConfig(serverConfig, serverPort);
+ final NettyConfig nettyClientConfig = createNettyConfig(clientConfig, clientPort);
- final NettyBufferPool bufferPool = new NettyBufferPool(1);
- final NettyProtocol protocol = new NoOpProtocol();
+ final NettyBufferPool bufferPool = new NettyBufferPool(1);
+ final NettyProtocol protocol = new NoOpProtocol();
- final NettyServer server =
- NettyTestUtil.initServer(nettyServerConfig, protocol, bufferPool);
- final NettyClient client =
- NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool);
- final NettyServerAndClient serverAndClient = new NettyServerAndClient(server, client);
+ final NettyServer server =
+ NettyTestUtil.initServer(nettyServerConfig, protocol, bufferPool);
+ final NettyClient client =
+ NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool);
+ serverAndClient = new NettyServerAndClient(server, client);
+ }
+ Assert.assertNotNull(
+ "serverAndClient is null due to fail to get a free port", serverAndClient);
final Channel ch = NettyTestUtil.connect(serverAndClient);
ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
@@ -253,11 +267,14 @@ public class NettyClientServerSslTest extends TestLogger {
config.setString(
SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT,
SSLUtilsTest.getCertificateFingerprint(config, "flink.test"));
+ NettyTestUtil.NettyServerAndClient serverAndClient;
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ NettyConfig nettyConfig = createNettyConfig(config, port);
- NettyConfig nettyConfig = createNettyConfig(config);
-
- NettyTestUtil.NettyServerAndClient serverAndClient =
- NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+ serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+ }
+ Assert.assertNotNull(
+ "serverAndClient is null due to fail to get a free port", serverAndClient);
Channel ch = NettyTestUtil.connect(serverAndClient);
ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
@@ -278,11 +295,14 @@ public class NettyClientServerSslTest extends TestLogger {
SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT,
SSLUtilsTest.getCertificateFingerprint(config, "flink.test")
.replaceAll("[0-9A-Z]", "0"));
+ NettyTestUtil.NettyServerAndClient serverAndClient;
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ NettyConfig nettyConfig = createNettyConfig(config, port);
- NettyConfig nettyConfig = createNettyConfig(config);
-
- NettyTestUtil.NettyServerAndClient serverAndClient =
- NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+ serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+ }
+ Assert.assertNotNull(
+ "serverAndClient is null due to fail to get a free port", serverAndClient);
Channel ch = NettyTestUtil.connect(serverAndClient);
ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
@@ -296,10 +316,11 @@ public class NettyClientServerSslTest extends TestLogger {
return SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(sslProvider);
}
- private static NettyConfig createNettyConfig(Configuration config) {
+ private static NettyConfig createNettyConfig(Configuration config, NetUtils.Port availablePort)
+ throws Exception {
return new NettyConfig(
InetAddress.getLoopbackAddress(),
- NetUtils.getAvailablePort(),
+ availablePort.getPort(),
NettyTestUtil.DEFAULT_SEGMENT_SIZE,
1,
config);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index 2e30726..9112553 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -34,6 +34,7 @@ import java.lang.reflect.Field;
import java.net.InetAddress;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
/** Simple netty connection manager test. */
public class NettyConnectionManagerTest {
@@ -46,17 +47,21 @@ public class NettyConnectionManagerTest {
public void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception {
// Expected number of arenas and threads
int numberOfSlots = 2;
-
- NettyConfig config =
- new NettyConfig(
- InetAddress.getLocalHost(),
- NetUtils.getAvailablePort(),
- 1024,
- numberOfSlots,
- new Configuration());
-
- NettyConnectionManager connectionManager = createNettyConnectionManager(config);
- connectionManager.start();
+ NettyConnectionManager connectionManager;
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ NettyConfig config =
+ new NettyConfig(
+ InetAddress.getLocalHost(),
+ port.getPort(),
+ 1024,
+ numberOfSlots,
+ new Configuration());
+
+ connectionManager = createNettyConnectionManager(config);
+ connectionManager.start();
+ }
+ assertNotNull(
+ "connectionManager is null due to fail to get a free port", connectionManager);
assertEquals(numberOfSlots, connectionManager.getBufferPool().getNumberOfArenas());
@@ -111,18 +116,20 @@ public class NettyConnectionManagerTest {
flinkConfig.setInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_CLIENT, 3);
flinkConfig.setInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_SERVER, 4);
- NettyConfig config =
- new NettyConfig(
- InetAddress.getLocalHost(),
- NetUtils.getAvailablePort(),
- 1024,
- 1337,
- flinkConfig);
+ NettyConnectionManager connectionManager;
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
- NettyConnectionManager connectionManager = createNettyConnectionManager(config);
- connectionManager.start();
+ NettyConfig config =
+ new NettyConfig(
+ InetAddress.getLocalHost(), port.getPort(), 1024, 1337, flinkConfig);
- assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas());
+ connectionManager = createNettyConnectionManager(config);
+ connectionManager.start();
+
+ assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas());
+ }
+ assertNotNull(
+ "connectionManager is null due to fail to get a free port", connectionManager);
{
// Client event loop group
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
index 9fd978d..3d8e8d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
@@ -236,16 +236,19 @@ public class NettyPartitionRequestClientTest {
private NettyPartitionRequestClient createPartitionRequestClient(
Channel tcpChannel, NetworkClientHandler clientHandler) throws Exception {
- int port = NetUtils.getAvailablePort();
- ConnectionID connectionID = new ConnectionID(new InetSocketAddress("localhost", port), 0);
- NettyConfig config =
- new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration());
- NettyClient nettyClient = new NettyClient(config);
- PartitionRequestClientFactory partitionRequestClientFactory =
- new PartitionRequestClientFactory(nettyClient);
-
- return new NettyPartitionRequestClient(
- tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
+ try (NetUtils.Port availablePort = NetUtils.getAvailablePort()) {
+ int port = availablePort.getPort();
+ ConnectionID connectionID =
+ new ConnectionID(new InetSocketAddress("localhost", port), 0);
+ NettyConfig config =
+ new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration());
+ NettyClient nettyClient = new NettyClient(config);
+ PartitionRequestClientFactory partitionRequestClientFactory =
+ new PartitionRequestClientFactory(nettyClient);
+
+ return new NettyPartitionRequestClient(
+ tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
+ }
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
index b9667bf..0ec9b53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
@@ -161,8 +161,10 @@ public class NettyTestUtil {
checkArgument(segmentSize > 0);
checkNotNull(config);
- return new NettyConfig(
- InetAddress.getLocalHost(), NetUtils.getAvailablePort(), segmentSize, 1, config);
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ return new NettyConfig(
+ InetAddress.getLocalHost(), port.getPort(), segmentSize, 1, config);
+ }
}
// ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 884e66b..67793d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
-import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
@@ -47,8 +46,6 @@ import static org.mockito.Mockito.mock;
/** {@link PartitionRequestClientFactory} test. */
public class PartitionRequestClientFactoryTest extends TestLogger {
- private static final int SERVER_PORT = NetUtils.getAvailablePort();
-
@Test
public void testInterruptsNotCached() throws Exception {
NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index 3f078a1..63032fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -396,42 +396,46 @@ public class TaskExecutorSubmissionTest extends TestLogger {
*/
@Test
public void testRemotePartitionNotFound() throws Exception {
- final int dataPort = NetUtils.getAvailablePort();
- Configuration config = new Configuration();
- config.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
- config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
- config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
-
- // Remote location (on the same TM though) for the partition
- NettyShuffleDescriptor sdd =
- NettyShuffleDescriptorBuilder.newBuilder().setDataPort(dataPort).buildRemote();
- TaskDeploymentDescriptor tdd = createReceiver(sdd);
- ExecutionAttemptID eid = tdd.getExecutionAttemptId();
-
- final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
- final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
-
- try (TaskSubmissionTestEnvironment env =
- new TaskSubmissionTestEnvironment.Builder(jobId)
- .setSlotSize(2)
- .addTaskManagerActionListener(
- eid, ExecutionState.RUNNING, taskRunningFuture)
- .addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
- .setConfiguration(config)
- .setLocalCommunication(false)
- .useRealNonMockShuffleEnvironment()
- .build()) {
- TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
-
- taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
- tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
- taskRunningFuture.get();
-
- taskFailedFuture.get();
- assertThat(
- taskSlotTable.getTask(eid).getFailureCause(),
- instanceOf(PartitionNotFoundException.class));
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ final int dataPort = port.getPort();
+
+ Configuration config = new Configuration();
+ config.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
+ config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+
+ // Remote location (on the same TM though) for the partition
+ NettyShuffleDescriptor sdd =
+ NettyShuffleDescriptorBuilder.newBuilder().setDataPort(dataPort).buildRemote();
+ TaskDeploymentDescriptor tdd = createReceiver(sdd);
+ ExecutionAttemptID eid = tdd.getExecutionAttemptId();
+
+ final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
+ final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
+
+ try (TaskSubmissionTestEnvironment env =
+ new TaskSubmissionTestEnvironment.Builder(jobId)
+ .setSlotSize(2)
+ .addTaskManagerActionListener(
+ eid, ExecutionState.RUNNING, taskRunningFuture)
+ .addTaskManagerActionListener(
+ eid, ExecutionState.FAILED, taskFailedFuture)
+ .setConfiguration(config)
+ .setLocalCommunication(false)
+ .useRealNonMockShuffleEnvironment()
+ .build()) {
+ TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
+ TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
+
+ taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
+ tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
+ taskRunningFuture.get();
+
+ taskFailedFuture.get();
+ assertThat(
+ taskSlotTable.getTask(eid).getFailureCause(),
+ instanceOf(PartitionNotFoundException.class));
+ }
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 58eb33c..3426ff7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -2182,27 +2182,31 @@ public class TaskExecutorTest extends TestLogger {
@Test(timeout = 10000L)
public void testLogNotFoundHandling() throws Throwable {
- final int dataPort = NetUtils.getAvailablePort();
- configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
- configuration.setInteger(
- NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
- configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
- configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
-
- try (TaskSubmissionTestEnvironment env =
- new Builder(jobId)
- .setConfiguration(configuration)
- .setLocalCommunication(false)
- .build()) {
- TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- try {
- CompletableFuture<TransientBlobKey> logFuture =
- tmGateway.requestFileUploadByType(FileType.LOG, timeout);
- logFuture.get();
- } catch (Exception e) {
- assertThat(
- e.getMessage(),
- containsString("The file LOG does not exist on the TaskExecutor."));
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ int dataPort = port.getPort();
+
+ configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
+ configuration.setInteger(
+ NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ configuration.setInteger(
+ NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+ configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+ try (TaskSubmissionTestEnvironment env =
+ new Builder(jobId)
+ .setConfiguration(configuration)
+ .setLocalCommunication(false)
+ .build()) {
+ TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
+ try {
+ CompletableFuture<TransientBlobKey> logFuture =
+ tmGateway.requestFileUploadByType(FileType.LOG, timeout);
+ logFuture.get();
+ } catch (Exception e) {
+ assertThat(
+ e.getMessage(),
+ containsString("The file LOG does not exist on the TaskExecutor."));
+ }
}
}
}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 93af2fa..271d42d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -146,15 +146,15 @@ public class IPv6HostnamesITCase extends TestLogger {
// test whether Akka's netty can bind to the address
log.info("Testing whether Akka can use " + addr);
- int port = NetUtils.getAvailablePort();
-
- final RpcService rpcService =
- RpcSystem.load()
- .localServiceBuilder(new Configuration())
- .withBindAddress(addr.getHostAddress())
- .withBindPort(port)
- .createAndStart();
- rpcService.stopService().get();
+ try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ final RpcService rpcService =
+ RpcSystem.load()
+ .localServiceBuilder(new Configuration())
+ .withBindAddress(addr.getHostAddress())
+ .withBindPort(port.getPort())
+ .createAndStart();
+ rpcService.stopService().get();
+ }
log.info("Using address " + addr);
return (Inet6Address) addr;