You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fp...@apache.org on 2022/01/05 14:39:57 UTC

[flink] branch master updated: [FLINK-22821][core] Stabilize NetUtils#getAvailablePort in order to avoid wrongly allocating any used ports

This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dd1fddb  [FLINK-22821][core] Stabilize NetUtils#getAvailablePort in order to avoid wrongly allocating any used ports
dd1fddb is described below

commit dd1fddb13b2d08ade580e5b3ec6b8e910974308d
Author: Yao Zhang <xz...@126.com>
AuthorDate: Wed Jun 2 16:17:02 2021 +0800

    [FLINK-22821][core] Stabilize NetUtils#getAvailablePort in order to avoid wrongly allocating any used ports
---
 .../apache/flink/client/program/ClientTest.java    |  15 +-
 .../connectors/kafka/KafkaTestEnvironmentImpl.java |  34 +++--
 .../main/java/org/apache/flink/util/FileLock.java  | 166 +++++++++++++++++++++
 .../main/java/org/apache/flink/util/NetUtils.java  |  36 ++++-
 .../apache/flink/client/python/PythonEnvUtils.java |   4 +-
 .../flink/queryablestate/network/ClientTest.java   |   4 +-
 .../io/network/netty/NettyClientServerSslTest.java | 103 ++++++++-----
 .../network/netty/NettyConnectionManagerTest.java  |  49 +++---
 .../netty/NettyPartitionRequestClientTest.java     |  23 +--
 .../runtime/io/network/netty/NettyTestUtil.java    |   6 +-
 .../netty/PartitionRequestClientFactoryTest.java   |   3 -
 .../taskexecutor/TaskExecutorSubmissionTest.java   |  76 +++++-----
 .../runtime/taskexecutor/TaskExecutorTest.java     |  46 +++---
 .../flink/test/runtime/IPv6HostnamesITCase.java    |  18 +--
 14 files changed, 416 insertions(+), 167 deletions(-)

diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 19d8e22..4d8e47d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -56,6 +56,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -86,6 +87,8 @@ public class ClientTest extends TestLogger {
 
     private Plan plan;
 
+    private NetUtils.Port port;
+
     private Configuration config;
 
     private static final String TEST_EXECUTOR_NAME = "test_executor";
@@ -102,14 +105,22 @@ public class ClientTest extends TestLogger {
         env.generateSequence(1, 1000).output(new DiscardingOutputFormat<>());
         plan = env.createProgramPlan();
 
-        final int freePort = NetUtils.getAvailablePort();
         config = new Configuration();
         config.setString(JobManagerOptions.ADDRESS, "localhost");
-        config.setInteger(JobManagerOptions.PORT, freePort);
+        NetUtils.Port port = NetUtils.getAvailablePort();
+        config.setInteger(JobManagerOptions.PORT, port.getPort());
+
         config.set(
                 AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
     }
 
+    @After
+    public void tearDown() throws Exception {
+        if (port != null) {
+            port.close();
+        }
+    }
+
     private Configuration fromPackagedProgram(
             final PackagedProgram program, final int parallelism, final boolean detached) {
         final Configuration configuration = new Configuration();
diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 47ca5a7..b601680 100644
--- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -119,7 +119,10 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
         zookeeper = null;
         brokers.clear();
 
-        zookeeper = new TestingServer(-1, tmpZkDir);
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+            zookeeper = new TestingServer(port.getPort(), tmpZkDir);
+        }
+
         zookeeperConnectionString = zookeeper.getConnectString();
         LOG.info(
                 "Starting Zookeeper with zookeeperConnectionString: {}", zookeeperConnectionString);
@@ -427,22 +430,23 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
         final int numTries = 5;
 
         for (int i = 1; i <= numTries; i++) {
-            int kafkaPort = NetUtils.getAvailablePort();
-            kafkaProperties.put("port", Integer.toString(kafkaPort));
-
-            // to support secure kafka cluster
-            if (config.isSecureMode()) {
-                LOG.info("Adding Kafka secure configurations");
-                kafkaProperties.put(
-                        "listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
-                kafkaProperties.put(
-                        "advertised.listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
-                kafkaProperties.putAll(getSecureProperties());
-            }
+            try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+                int kafkaPort = port.getPort();
+                kafkaProperties.put("port", Integer.toString(kafkaPort));
+
+                // to support secure kafka cluster
+                if (config.isSecureMode()) {
+                    LOG.info("Adding Kafka secure configurations");
+                    kafkaProperties.put(
+                            "listeners", "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                    kafkaProperties.put(
+                            "advertised.listeners",
+                            "SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
+                    kafkaProperties.putAll(getSecureProperties());
+                }
 
-            KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+                KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
 
-            try {
                 scala.Option<String> stringNone = scala.Option.apply(null);
                 KafkaServer server = new KafkaServer(kafkaConfig, Time.SYSTEM, stringNone, false);
                 server.startup();
diff --git a/flink-core/src/main/java/org/apache/flink/util/FileLock.java b/flink-core/src/main/java/org/apache/flink/util/FileLock.java
new file mode 100644
index 0000000..c247c94
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/FileLock.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+/** A file lock used for avoiding race condition among multiple threads/processes. */
+@Internal
+public class FileLock {
+    private static final String TEMP_DIR = System.getProperty("java.io.tmpdir");
+    private final File file;
+    private FileOutputStream outputStream;
+    private java.nio.channels.FileLock lock;
+
+    /**
+     * Initialize a FileLock using a file located at fullPath.
+     *
+     * @param fullPath The path of the locking file
+     */
+    public FileLock(String fullPath) {
+        Preconditions.checkNotNull(fullPath, "fullPath should not be null");
+        Path path = Paths.get(fullPath);
+        String normalizedFileName = normalizeFileName(path.getFileName().toString());
+        if (normalizedFileName.isEmpty()) {
+            throw new IllegalArgumentException("There are no legal characters in the file name");
+        }
+        this.file =
+                path.getParent() == null
+                        ? new File(normalizedFileName)
+                        : new File(path.getParent().toString(), normalizedFileName);
+    }
+
+    /**
+     * Initialize a FileLock using a file located at parentDir/fileName.
+     *
+     * @param parentDir The parent dir of the locking file
+     * @param fileName The name of the locking file
+     */
+    public FileLock(String parentDir, String fileName) {
+        Preconditions.checkNotNull(parentDir, "parentDir should not be null");
+        Preconditions.checkNotNull(fileName, "fileName should not be null");
+        this.file = new File(parentDir, normalizeFileName(fileName));
+    }
+
+    /**
+     * Initialize a FileLock using a file located inside temp folder.
+     *
+     * @param fileName The name of the locking file
+     * @return The initialized FileLock
+     */
+    public static FileLock inTempFolder(String fileName) {
+        return new FileLock(TEMP_DIR, fileName);
+    }
+
+    /**
+     * Check whether the locking file exists in the file system. Create it if it does not exist.
+     * Then create a FileOutputStream for it.
+     *
+     * @throws IOException If the file path is invalid or the parent dir does not exist
+     */
+    private void init() throws IOException {
+        if (!this.file.exists()) {
+            this.file.createNewFile();
+        }
+        outputStream = new FileOutputStream(this.file);
+    }
+
+    /**
+     * Try to acquire a lock on the locking file. This method immediately returns whenever the lock
+     * is acquired or not.
+     *
+     * @return True if successfully acquired the lock
+     * @throws IOException If the file path is invalid
+     */
+    public boolean tryLock() throws IOException {
+        if (outputStream == null) {
+            init();
+        }
+        try {
+            lock = outputStream.getChannel().tryLock();
+        } catch (Exception e) {
+            return false;
+        }
+
+        return lock != null;
+    }
+
+    /**
+     * Release the file lock.
+     *
+     * @throws IOException If the FileChannel is closed
+     */
+    public void unlock() throws IOException {
+        if (lock != null && lock.channel().isOpen()) {
+            lock.release();
+        }
+    }
+
+    /**
+     * Release the file lock, close the fileChannel and FileOutputStream then try deleting the
+     * locking file if other file lock does not need it, which means the lock will not be used
+     * anymore.
+     *
+     * @throws IOException If an I/O error occurs
+     */
+    public void unlockAndDestroy() throws IOException {
+        try {
+            unlock();
+            if (lock != null) {
+                lock.channel().close();
+                lock = null;
+            }
+            if (outputStream != null) {
+                outputStream.close();
+                outputStream = null;
+            }
+
+        } finally {
+            this.file.delete();
+        }
+    }
+
+    /**
+     * Check whether a FileLock is actually holding the lock.
+     *
+     * @return True if it is actually holding the lock
+     */
+    public boolean isValid() {
+        if (lock != null) {
+            return lock.isValid();
+        }
+        return false;
+    }
+
+    /**
+     * Normalize the file name, which only allows slash, backslash, digits and letters.
+     *
+     * @param fileName Original file name
+     * @return File name with illegal characters stripped
+     */
+    private static String normalizeFileName(String fileName) {
+        return fileName.replaceAll("[^\\w/\\\\]", "");
+    }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
index 89b11df..5da1fd2 100644
--- a/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/NetUtils.java
@@ -132,7 +132,7 @@ public class NetUtils {
      *     if SO_TIMEOUT is greater than 0, then this method will suppress SocketTimeoutException;
      *     must not be null; SO_TIMEOUT option must be set to 0
      * @return the new Socket
-     * @exception IOException see {@link ServerSocket#accept()}
+     * @throws IOException see {@link ServerSocket#accept()}
      * @see <a href="https://bugs.openjdk.java.net/browse/JDK-8237858">JDK-8237858</a>
      */
     public static Socket acceptWithoutTimeout(ServerSocket serverSocket) throws IOException {
@@ -158,12 +158,17 @@ public class NetUtils {
      *
      * @return A non-occupied port.
      */
-    public static int getAvailablePort() {
+    public static Port getAvailablePort() {
         for (int i = 0; i < 50; i++) {
             try (ServerSocket serverSocket = new ServerSocket(0)) {
                 int port = serverSocket.getLocalPort();
                 if (port != 0) {
-                    return port;
+                    FileLock fileLock = new FileLock(NetUtils.class.getName() + port);
+                    if (fileLock.tryLock()) {
+                        return new Port(port, fileLock);
+                    } else {
+                        fileLock.unlockAndDestroy();
+                    }
                 }
             } catch (IOException ignored) {
             }
@@ -498,4 +503,29 @@ public class NetUtils {
     public static boolean isValidHostPort(int port) {
         return 0 <= port && port <= 65535;
     }
+
+    /**
+     * Port wrapper class which holds a {@link FileLock} until it releases. Used to avoid race
+     * condition among multiple threads/processes.
+     */
+    public static class Port implements AutoCloseable {
+        private final int port;
+        private final FileLock fileLock;
+
+        public Port(int port, FileLock fileLock) throws IOException {
+            Preconditions.checkNotNull(fileLock, "FileLock should not be null");
+            Preconditions.checkState(fileLock.isValid(), "FileLock should be locked");
+            this.port = port;
+            this.fileLock = fileLock;
+        }
+
+        public int getPort() {
+            return port;
+        }
+
+        @Override
+        public void close() throws Exception {
+            fileLock.unlockAndDestroy();
+        }
+    }
 }
diff --git a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
index 348062c..ca7d990 100644
--- a/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
+++ b/flink-python/src/main/java/org/apache/flink/client/python/PythonEnvUtils.java
@@ -362,8 +362,8 @@ final class PythonEnvUtils {
         Thread thread =
                 new Thread(
                         () -> {
-                            try {
-                                int freePort = NetUtils.getAvailablePort();
+                            try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+                                int freePort = port.getPort();
                                 GatewayServer server =
                                         new GatewayServer.GatewayServerBuilder()
                                                 .gateway(
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 83c8613..383fe41 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -255,10 +255,10 @@ public class ClientTest extends TestLogger {
 
         Client<KvStateInternalRequest, KvStateResponse> client = null;
 
-        try {
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
             client = new Client<>("Test Client", 1, serializer, stats);
 
-            int availablePort = NetUtils.getAvailablePort();
+            int availablePort = port.getPort();
 
             InetSocketAddress serverAddress =
                     new InetSocketAddress(InetAddress.getLocalHost(), availablePort);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
index 8a440d2..fd5f490 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.java
@@ -88,21 +88,26 @@ public class NettyClientServerSslTest extends TestLogger {
 
         NettyProtocol protocol = new NoOpProtocol();
 
-        NettyConfig nettyConfig = createNettyConfig(sslConfig);
-
-        final NettyBufferPool bufferPool = new NettyBufferPool(1);
-        final NettyServer server =
-                NettyTestUtil.initServer(
-                        nettyConfig,
-                        bufferPool,
-                        sslHandlerFactory ->
-                                new TestingServerChannelInitializer(
-                                        protocol,
-                                        sslHandlerFactory,
-                                        serverChannelInitComplete,
-                                        serverSslHandler));
-        final NettyClient client = NettyTestUtil.initClient(nettyConfig, protocol, bufferPool);
-        final NettyServerAndClient serverAndClient = new NettyServerAndClient(server, client);
+        NettyServerAndClient serverAndClient;
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+            NettyConfig nettyConfig = createNettyConfig(sslConfig, port);
+
+            final NettyBufferPool bufferPool = new NettyBufferPool(1);
+            final NettyServer server =
+                    NettyTestUtil.initServer(
+                            nettyConfig,
+                            bufferPool,
+                            sslHandlerFactory ->
+                                    new TestingServerChannelInitializer(
+                                            protocol,
+                                            sslHandlerFactory,
+                                            serverChannelInitComplete,
+                                            serverSslHandler));
+            final NettyClient client = NettyTestUtil.initClient(nettyConfig, protocol, bufferPool);
+            serverAndClient = new NettyServerAndClient(server, client);
+        }
+        Assert.assertNotNull(
+                "serverAndClient is null due to fail to get a free port", serverAndClient);
 
         Channel ch = NettyTestUtil.connect(serverAndClient);
 
@@ -175,10 +180,10 @@ public class NettyClientServerSslTest extends TestLogger {
         // Modify the keystore password to an incorrect one
         config.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "invalidpassword");
 
-        NettyConfig nettyConfig = createNettyConfig(config);
-
         NettyTestUtil.NettyServerAndClient serverAndClient = null;
-        try {
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+            NettyConfig nettyConfig = createNettyConfig(config, port);
+
             serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
             Assert.fail("Created server and client from invalid configuration");
         } catch (Exception e) {
@@ -199,11 +204,14 @@ public class NettyClientServerSslTest extends TestLogger {
         config.setString(
                 SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
 
-        NettyConfig nettyConfig = createNettyConfig(config);
-
-        NettyTestUtil.NettyServerAndClient serverAndClient =
-                NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+        NettyTestUtil.NettyServerAndClient serverAndClient;
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+            NettyConfig nettyConfig = createNettyConfig(config, port);
 
+            serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+        }
+        Assert.assertNotNull(
+                "serverAndClient is null due to fail to get a free port", serverAndClient);
         Channel ch = NettyTestUtil.connect(serverAndClient);
         ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
 
@@ -222,17 +230,23 @@ public class NettyClientServerSslTest extends TestLogger {
         clientConfig.setString(
                 SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
 
-        final NettyConfig nettyServerConfig = createNettyConfig(serverConfig);
-        final NettyConfig nettyClientConfig = createNettyConfig(clientConfig);
+        NettyServerAndClient serverAndClient;
+        try (NetUtils.Port serverPort = NetUtils.getAvailablePort();
+                NetUtils.Port clientPort = NetUtils.getAvailablePort()) {
+            final NettyConfig nettyServerConfig = createNettyConfig(serverConfig, serverPort);
+            final NettyConfig nettyClientConfig = createNettyConfig(clientConfig, clientPort);
 
-        final NettyBufferPool bufferPool = new NettyBufferPool(1);
-        final NettyProtocol protocol = new NoOpProtocol();
+            final NettyBufferPool bufferPool = new NettyBufferPool(1);
+            final NettyProtocol protocol = new NoOpProtocol();
 
-        final NettyServer server =
-                NettyTestUtil.initServer(nettyServerConfig, protocol, bufferPool);
-        final NettyClient client =
-                NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool);
-        final NettyServerAndClient serverAndClient = new NettyServerAndClient(server, client);
+            final NettyServer server =
+                    NettyTestUtil.initServer(nettyServerConfig, protocol, bufferPool);
+            final NettyClient client =
+                    NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool);
+            serverAndClient = new NettyServerAndClient(server, client);
+        }
+        Assert.assertNotNull(
+                "serverAndClient is null due to fail to get a free port", serverAndClient);
 
         final Channel ch = NettyTestUtil.connect(serverAndClient);
         ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
@@ -253,11 +267,14 @@ public class NettyClientServerSslTest extends TestLogger {
         config.setString(
                 SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT,
                 SSLUtilsTest.getCertificateFingerprint(config, "flink.test"));
+        NettyTestUtil.NettyServerAndClient serverAndClient;
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+            NettyConfig nettyConfig = createNettyConfig(config, port);
 
-        NettyConfig nettyConfig = createNettyConfig(config);
-
-        NettyTestUtil.NettyServerAndClient serverAndClient =
-                NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+            serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+        }
+        Assert.assertNotNull(
+                "serverAndClient is null due to fail to get a free port", serverAndClient);
 
         Channel ch = NettyTestUtil.connect(serverAndClient);
         ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
@@ -278,11 +295,14 @@ public class NettyClientServerSslTest extends TestLogger {
                 SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT,
                 SSLUtilsTest.getCertificateFingerprint(config, "flink.test")
                         .replaceAll("[0-9A-Z]", "0"));
+        NettyTestUtil.NettyServerAndClient serverAndClient;
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+            NettyConfig nettyConfig = createNettyConfig(config, port);
 
-        NettyConfig nettyConfig = createNettyConfig(config);
-
-        NettyTestUtil.NettyServerAndClient serverAndClient =
-                NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+            serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
+        }
+        Assert.assertNotNull(
+                "serverAndClient is null due to fail to get a free port", serverAndClient);
 
         Channel ch = NettyTestUtil.connect(serverAndClient);
         ch.pipeline().addLast(new StringDecoder()).addLast(new StringEncoder());
@@ -296,10 +316,11 @@ public class NettyClientServerSslTest extends TestLogger {
         return SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(sslProvider);
     }
 
-    private static NettyConfig createNettyConfig(Configuration config) {
+    private static NettyConfig createNettyConfig(Configuration config, NetUtils.Port availablePort)
+            throws Exception {
         return new NettyConfig(
                 InetAddress.getLoopbackAddress(),
-                NetUtils.getAvailablePort(),
+                availablePort.getPort(),
                 NettyTestUtil.DEFAULT_SEGMENT_SIZE,
                 1,
                 config);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index 2e30726..9112553 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -34,6 +34,7 @@ import java.lang.reflect.Field;
 import java.net.InetAddress;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 /** Simple netty connection manager test. */
 public class NettyConnectionManagerTest {
@@ -46,17 +47,21 @@ public class NettyConnectionManagerTest {
     public void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception {
         // Expected number of arenas and threads
         int numberOfSlots = 2;
-
-        NettyConfig config =
-                new NettyConfig(
-                        InetAddress.getLocalHost(),
-                        NetUtils.getAvailablePort(),
-                        1024,
-                        numberOfSlots,
-                        new Configuration());
-
-        NettyConnectionManager connectionManager = createNettyConnectionManager(config);
-        connectionManager.start();
+        NettyConnectionManager connectionManager;
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+            NettyConfig config =
+                    new NettyConfig(
+                            InetAddress.getLocalHost(),
+                            port.getPort(),
+                            1024,
+                            numberOfSlots,
+                            new Configuration());
+
+            connectionManager = createNettyConnectionManager(config);
+            connectionManager.start();
+        }
+        assertNotNull(
+                "connectionManager is null due to fail to get a free port", connectionManager);
 
         assertEquals(numberOfSlots, connectionManager.getBufferPool().getNumberOfArenas());
 
@@ -111,18 +116,20 @@ public class NettyConnectionManagerTest {
         flinkConfig.setInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_CLIENT, 3);
         flinkConfig.setInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_SERVER, 4);
 
-        NettyConfig config =
-                new NettyConfig(
-                        InetAddress.getLocalHost(),
-                        NetUtils.getAvailablePort(),
-                        1024,
-                        1337,
-                        flinkConfig);
+        NettyConnectionManager connectionManager;
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
 
-        NettyConnectionManager connectionManager = createNettyConnectionManager(config);
-        connectionManager.start();
+            NettyConfig config =
+                    new NettyConfig(
+                            InetAddress.getLocalHost(), port.getPort(), 1024, 1337, flinkConfig);
 
-        assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas());
+            connectionManager = createNettyConnectionManager(config);
+            connectionManager.start();
+
+            assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas());
+        }
+        assertNotNull(
+                "connectionManager is null due to fail to get a free port", connectionManager);
 
         {
             // Client event loop group
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
index 9fd978d..3d8e8d5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
@@ -236,16 +236,19 @@ public class NettyPartitionRequestClientTest {
 
     private NettyPartitionRequestClient createPartitionRequestClient(
             Channel tcpChannel, NetworkClientHandler clientHandler) throws Exception {
-        int port = NetUtils.getAvailablePort();
-        ConnectionID connectionID = new ConnectionID(new InetSocketAddress("localhost", port), 0);
-        NettyConfig config =
-                new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration());
-        NettyClient nettyClient = new NettyClient(config);
-        PartitionRequestClientFactory partitionRequestClientFactory =
-                new PartitionRequestClientFactory(nettyClient);
-
-        return new NettyPartitionRequestClient(
-                tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
+        try (NetUtils.Port availablePort = NetUtils.getAvailablePort()) {
+            int port = availablePort.getPort();
+            ConnectionID connectionID =
+                    new ConnectionID(new InetSocketAddress("localhost", port), 0);
+            NettyConfig config =
+                    new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration());
+            NettyClient nettyClient = new NettyClient(config);
+            PartitionRequestClientFactory partitionRequestClientFactory =
+                    new PartitionRequestClientFactory(nettyClient);
+
+            return new NettyPartitionRequestClient(
+                    tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
+        }
     }
 
     /**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
index b9667bf..0ec9b53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
@@ -161,8 +161,10 @@ public class NettyTestUtil {
         checkArgument(segmentSize > 0);
         checkNotNull(config);
 
-        return new NettyConfig(
-                InetAddress.getLocalHost(), NetUtils.getAvailablePort(), segmentSize, 1, config);
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+            return new NettyConfig(
+                    InetAddress.getLocalHost(), port.getPort(), segmentSize, 1, config);
+        }
     }
 
     // ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 884e66b..67793d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.netty;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
-import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
@@ -47,8 +46,6 @@ import static org.mockito.Mockito.mock;
 /** {@link PartitionRequestClientFactory} test. */
 public class PartitionRequestClientFactoryTest extends TestLogger {
 
-    private static final int SERVER_PORT = NetUtils.getAvailablePort();
-
     @Test
     public void testInterruptsNotCached() throws Exception {
         NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
index 3f078a1..63032fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorSubmissionTest.java
@@ -396,42 +396,46 @@ public class TaskExecutorSubmissionTest extends TestLogger {
      */
     @Test
     public void testRemotePartitionNotFound() throws Exception {
-        final int dataPort = NetUtils.getAvailablePort();
-        Configuration config = new Configuration();
-        config.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
-        config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
-        config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
-
-        // Remote location (on the same TM though) for the partition
-        NettyShuffleDescriptor sdd =
-                NettyShuffleDescriptorBuilder.newBuilder().setDataPort(dataPort).buildRemote();
-        TaskDeploymentDescriptor tdd = createReceiver(sdd);
-        ExecutionAttemptID eid = tdd.getExecutionAttemptId();
-
-        final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
-        final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
-
-        try (TaskSubmissionTestEnvironment env =
-                new TaskSubmissionTestEnvironment.Builder(jobId)
-                        .setSlotSize(2)
-                        .addTaskManagerActionListener(
-                                eid, ExecutionState.RUNNING, taskRunningFuture)
-                        .addTaskManagerActionListener(eid, ExecutionState.FAILED, taskFailedFuture)
-                        .setConfiguration(config)
-                        .setLocalCommunication(false)
-                        .useRealNonMockShuffleEnvironment()
-                        .build()) {
-            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-            TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
-
-            taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
-            tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
-            taskRunningFuture.get();
-
-            taskFailedFuture.get();
-            assertThat(
-                    taskSlotTable.getTask(eid).getFailureCause(),
-                    instanceOf(PartitionNotFoundException.class));
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+            final int dataPort = port.getPort();
+
+            Configuration config = new Configuration();
+            config.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
+            config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+            config.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+
+            // Remote location (on the same TM though) for the partition
+            NettyShuffleDescriptor sdd =
+                    NettyShuffleDescriptorBuilder.newBuilder().setDataPort(dataPort).buildRemote();
+            TaskDeploymentDescriptor tdd = createReceiver(sdd);
+            ExecutionAttemptID eid = tdd.getExecutionAttemptId();
+
+            final CompletableFuture<Void> taskRunningFuture = new CompletableFuture<>();
+            final CompletableFuture<Void> taskFailedFuture = new CompletableFuture<>();
+
+            try (TaskSubmissionTestEnvironment env =
+                    new TaskSubmissionTestEnvironment.Builder(jobId)
+                            .setSlotSize(2)
+                            .addTaskManagerActionListener(
+                                    eid, ExecutionState.RUNNING, taskRunningFuture)
+                            .addTaskManagerActionListener(
+                                    eid, ExecutionState.FAILED, taskFailedFuture)
+                            .setConfiguration(config)
+                            .setLocalCommunication(false)
+                            .useRealNonMockShuffleEnvironment()
+                            .build()) {
+                TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
+                TaskSlotTable<Task> taskSlotTable = env.getTaskSlotTable();
+
+                taskSlotTable.allocateSlot(0, jobId, tdd.getAllocationId(), Time.seconds(60));
+                tmGateway.submitTask(tdd, env.getJobMasterId(), timeout).get();
+                taskRunningFuture.get();
+
+                taskFailedFuture.get();
+                assertThat(
+                        taskSlotTable.getTask(eid).getFailureCause(),
+                        instanceOf(PartitionNotFoundException.class));
+            }
         }
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 58eb33c..3426ff7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -2182,27 +2182,31 @@ public class TaskExecutorTest extends TestLogger {
 
     @Test(timeout = 10000L)
     public void testLogNotFoundHandling() throws Throwable {
-        final int dataPort = NetUtils.getAvailablePort();
-        configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
-        configuration.setInteger(
-                NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
-        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
-        configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
-
-        try (TaskSubmissionTestEnvironment env =
-                new Builder(jobId)
-                        .setConfiguration(configuration)
-                        .setLocalCommunication(false)
-                        .build()) {
-            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-            try {
-                CompletableFuture<TransientBlobKey> logFuture =
-                        tmGateway.requestFileUploadByType(FileType.LOG, timeout);
-                logFuture.get();
-            } catch (Exception e) {
-                assertThat(
-                        e.getMessage(),
-                        containsString("The file LOG does not exist on the TaskExecutor."));
+        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+            int dataPort = port.getPort();
+
+            configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
+            configuration.setInteger(
+                    NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+            configuration.setInteger(
+                    NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+            configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+            try (TaskSubmissionTestEnvironment env =
+                    new Builder(jobId)
+                            .setConfiguration(configuration)
+                            .setLocalCommunication(false)
+                            .build()) {
+                TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
+                try {
+                    CompletableFuture<TransientBlobKey> logFuture =
+                            tmGateway.requestFileUploadByType(FileType.LOG, timeout);
+                    logFuture.get();
+                } catch (Exception e) {
+                    assertThat(
+                            e.getMessage(),
+                            containsString("The file LOG does not exist on the TaskExecutor."));
+                }
             }
         }
     }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
index 93af2fa..271d42d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java
@@ -146,15 +146,15 @@ public class IPv6HostnamesITCase extends TestLogger {
 
                             // test whether Akka's netty can bind to the address
                             log.info("Testing whether Akka can use " + addr);
-                            int port = NetUtils.getAvailablePort();
-
-                            final RpcService rpcService =
-                                    RpcSystem.load()
-                                            .localServiceBuilder(new Configuration())
-                                            .withBindAddress(addr.getHostAddress())
-                                            .withBindPort(port)
-                                            .createAndStart();
-                            rpcService.stopService().get();
+                            try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+                                final RpcService rpcService =
+                                        RpcSystem.load()
+                                                .localServiceBuilder(new Configuration())
+                                                .withBindAddress(addr.getHostAddress())
+                                                .withBindPort(port.getPort())
+                                                .createAndStart();
+                                rpcService.stopService().get();
+                            }
 
                             log.info("Using address " + addr);
                             return (Inet6Address) addr;