You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2023/01/19 09:48:35 UTC

[flink] branch master updated: [FLINK-30678][tests] Use random port

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e705090c89e [FLINK-30678][tests] Use random port
e705090c89e is described below

commit e705090c89ef9411304d54c57eac2f0080c522a7
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jan 19 10:48:23 2023 +0100

    [FLINK-30678][tests] Use random port
---
 .../apache/flink/client/program/ClientTest.java    | 13 ------
 .../flink/queryablestate/network/ClientTest.java   |  8 +---
 .../network/netty/NettyConnectionManagerTest.java  | 11 ++----
 .../netty/NettyPartitionRequestClientTest.java     | 25 +++++-------
 .../runtime/taskexecutor/TaskExecutorTest.java     | 46 ++++++++++------------
 5 files changed, 36 insertions(+), 67 deletions(-)

diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 4d8f4403e81..6efc8fb0bd1 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -54,9 +54,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.NetUtils;
 
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -88,8 +86,6 @@ class ClientTest {
 
     private Plan plan;
 
-    private NetUtils.Port port;
-
     private Configuration config;
 
     private static final String TEST_EXECUTOR_NAME = "test_executor";
@@ -108,20 +104,11 @@ class ClientTest {
 
         config = new Configuration();
         config.setString(JobManagerOptions.ADDRESS, "localhost");
-        port = NetUtils.getAvailablePort();
-        config.setInteger(JobManagerOptions.PORT, port.getPort());
 
         config.set(
                 AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
     }
 
-    @AfterEach
-    void tearDown() throws Exception {
-        if (port != null) {
-            port.close();
-        }
-    }
-
     private Configuration fromPackagedProgram(
             final PackagedProgram program, final int parallelism, final boolean detached) {
         final Configuration configuration = new Configuration();
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 383fe41ee18..193508ce16f 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -255,13 +254,10 @@ public class ClientTest extends TestLogger {
 
         Client<KvStateInternalRequest, KvStateResponse> client = null;
 
-        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+        try {
             client = new Client<>("Test Client", 1, serializer, stats);
 
-            int availablePort = port.getPort();
-
-            InetSocketAddress serverAddress =
-                    new InetSocketAddress(InetAddress.getLocalHost(), availablePort);
+            InetSocketAddress serverAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);
 
             KvStateInternalRequest request =
                     new KvStateInternalRequest(new KvStateID(), new byte[0]);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index d313c594d13..f93381db59d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -48,11 +47,11 @@ public class NettyConnectionManagerTest {
         // Expected number of arenas and threads
         int numberOfSlots = 2;
         NettyConnectionManager connectionManager;
-        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+        {
             NettyConfig config =
                     new NettyConfig(
                             InetAddress.getLocalHost(),
-                            port.getPort(),
+                            0,
                             1024,
                             numberOfSlots,
                             new Configuration());
@@ -117,11 +116,9 @@ public class NettyConnectionManagerTest {
         flinkConfig.setInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_SERVER, 4);
 
         NettyConnectionManager connectionManager;
-        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
-
+        {
             NettyConfig config =
-                    new NettyConfig(
-                            InetAddress.getLocalHost(), port.getPort(), 1024, 1337, flinkConfig);
+                    new NettyConfig(InetAddress.getLocalHost(), 0, 1024, 1337, flinkConfig);
 
             connectionManager = createNettyConnectionManager(config);
             connectionManager.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
index 6ede9dbdc90..ad34bdf363e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.network.netty.NettyMessage.ResumeConsumption;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.util.NetUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
@@ -285,20 +284,16 @@ public class NettyPartitionRequestClientTest {
     private NettyPartitionRequestClient createPartitionRequestClient(
             Channel tcpChannel, NetworkClientHandler clientHandler, boolean connectionReuseEnabled)
             throws Exception {
-        try (NetUtils.Port availablePort = NetUtils.getAvailablePort()) {
-            int port = availablePort.getPort();
-            ConnectionID connectionID =
-                    new ConnectionID(
-                            ResourceID.generate(), new InetSocketAddress("localhost", port), 0);
-            NettyConfig config =
-                    new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration());
-            NettyClient nettyClient = new NettyClient(config);
-            PartitionRequestClientFactory partitionRequestClientFactory =
-                    new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled);
-
-            return new NettyPartitionRequestClient(
-                    tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
-        }
+        ConnectionID connectionID =
+                new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0);
+        NettyConfig config =
+                new NettyConfig(InetAddress.getLocalHost(), 0, 1024, 1, new Configuration());
+        NettyClient nettyClient = new NettyClient(config);
+        PartitionRequestClientFactory partitionRequestClientFactory =
+                new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled);
+
+        return new NettyPartitionRequestClient(
+                tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
     }
 
     /**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index cc857ba0e1f..6d3a885ca43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -109,7 +109,6 @@ import org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.Reference;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.TimeUtils;
@@ -2244,31 +2243,26 @@ public class TaskExecutorTest extends TestLogger {
 
     @Test(timeout = 10000L)
     public void testLogNotFoundHandling() throws Throwable {
-        try (NetUtils.Port port = NetUtils.getAvailablePort()) {
-            int dataPort = port.getPort();
-
-            configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
-            configuration.setInteger(
-                    NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
-            configuration.setInteger(
-                    NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
-            configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
-
-            try (TaskSubmissionTestEnvironment env =
-                    new Builder(jobId)
-                            .setConfiguration(configuration)
-                            .setLocalCommunication(false)
-                            .build(EXECUTOR_RESOURCE.getExecutor())) {
-                TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
-                try {
-                    CompletableFuture<TransientBlobKey> logFuture =
-                            tmGateway.requestFileUploadByType(FileType.LOG, timeout);
-                    logFuture.get();
-                } catch (Exception e) {
-                    assertThat(
-                            e.getMessage(),
-                            containsString("The file LOG does not exist on the TaskExecutor."));
-                }
+        configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, 0);
+        configuration.setInteger(
+                NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+        configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+        try (TaskSubmissionTestEnvironment env =
+                new Builder(jobId)
+                        .setConfiguration(configuration)
+                        .setLocalCommunication(false)
+                        .build(EXECUTOR_RESOURCE.getExecutor())) {
+            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
+            try {
+                CompletableFuture<TransientBlobKey> logFuture =
+                        tmGateway.requestFileUploadByType(FileType.LOG, timeout);
+                logFuture.get();
+            } catch (Exception e) {
+                assertThat(
+                        e.getMessage(),
+                        containsString("The file LOG does not exist on the TaskExecutor."));
             }
         }
     }