You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2023/01/19 09:48:35 UTC
[flink] branch master updated: [FLINK-30678][tests] Use random port
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e705090c89e [FLINK-30678][tests] Use random port
e705090c89e is described below
commit e705090c89ef9411304d54c57eac2f0080c522a7
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Thu Jan 19 10:48:23 2023 +0100
[FLINK-30678][tests] Use random port
---
.../apache/flink/client/program/ClientTest.java | 13 ------
.../flink/queryablestate/network/ClientTest.java | 8 +---
.../network/netty/NettyConnectionManagerTest.java | 11 ++----
.../netty/NettyPartitionRequestClientTest.java | 25 +++++-------
.../runtime/taskexecutor/TaskExecutorTest.java | 46 ++++++++++------------
5 files changed, 36 insertions(+), 67 deletions(-)
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 4d8f4403e81..6efc8fb0bd1 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -54,9 +54,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.NetUtils;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -88,8 +86,6 @@ class ClientTest {
private Plan plan;
- private NetUtils.Port port;
-
private Configuration config;
private static final String TEST_EXECUTOR_NAME = "test_executor";
@@ -108,20 +104,11 @@ class ClientTest {
config = new Configuration();
config.setString(JobManagerOptions.ADDRESS, "localhost");
- port = NetUtils.getAvailablePort();
- config.setInteger(JobManagerOptions.PORT, port.getPort());
config.set(
AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue());
}
- @AfterEach
- void tearDown() throws Exception {
- if (port != null) {
- port.close();
- }
- }
-
private Configuration fromPackagedProgram(
final PackagedProgram program, final int parallelism, final boolean detached) {
final Configuration configuration = new Configuration();
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 383fe41ee18..193508ce16f 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -45,7 +45,6 @@ import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -255,13 +254,10 @@ public class ClientTest extends TestLogger {
Client<KvStateInternalRequest, KvStateResponse> client = null;
- try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ try {
client = new Client<>("Test Client", 1, serializer, stats);
- int availablePort = port.getPort();
-
- InetSocketAddress serverAddress =
- new InetSocketAddress(InetAddress.getLocalHost(), availablePort);
+ InetSocketAddress serverAddress = new InetSocketAddress(InetAddress.getLocalHost(), 0);
KvStateInternalRequest request =
new KvStateInternalRequest(new KvStateID(), new byte[0]);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index d313c594d13..f93381db59d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.util.NetUtils;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
@@ -48,11 +47,11 @@ public class NettyConnectionManagerTest {
// Expected number of arenas and threads
int numberOfSlots = 2;
NettyConnectionManager connectionManager;
- try (NetUtils.Port port = NetUtils.getAvailablePort()) {
+ {
NettyConfig config =
new NettyConfig(
InetAddress.getLocalHost(),
- port.getPort(),
+ 0,
1024,
numberOfSlots,
new Configuration());
@@ -117,11 +116,9 @@ public class NettyConnectionManagerTest {
flinkConfig.setInteger(NettyShuffleEnvironmentOptions.NUM_THREADS_SERVER, 4);
NettyConnectionManager connectionManager;
- try (NetUtils.Port port = NetUtils.getAvailablePort()) {
-
+ {
NettyConfig config =
- new NettyConfig(
- InetAddress.getLocalHost(), port.getPort(), 1024, 1337, flinkConfig);
+ new NettyConfig(InetAddress.getLocalHost(), 0, 1024, 1337, flinkConfig);
connectionManager = createNettyConnectionManager(config);
connectionManager.start();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
index 6ede9dbdc90..ad34bdf363e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.network.netty.NettyMessage.ResumeConsumption;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.util.NetUtils;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
@@ -285,20 +284,16 @@ public class NettyPartitionRequestClientTest {
private NettyPartitionRequestClient createPartitionRequestClient(
Channel tcpChannel, NetworkClientHandler clientHandler, boolean connectionReuseEnabled)
throws Exception {
- try (NetUtils.Port availablePort = NetUtils.getAvailablePort()) {
- int port = availablePort.getPort();
- ConnectionID connectionID =
- new ConnectionID(
- ResourceID.generate(), new InetSocketAddress("localhost", port), 0);
- NettyConfig config =
- new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration());
- NettyClient nettyClient = new NettyClient(config);
- PartitionRequestClientFactory partitionRequestClientFactory =
- new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled);
-
- return new NettyPartitionRequestClient(
- tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
- }
+ ConnectionID connectionID =
+ new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0);
+ NettyConfig config =
+ new NettyConfig(InetAddress.getLocalHost(), 0, 1024, 1, new Configuration());
+ NettyClient nettyClient = new NettyClient(config);
+ PartitionRequestClientFactory partitionRequestClientFactory =
+ new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled);
+
+ return new NettyPartitionRequestClient(
+ tcpChannel, clientHandler, connectionID, partitionRequestClientFactory);
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index cc857ba0e1f..6d3a885ca43 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -109,7 +109,6 @@ import org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Reference;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
@@ -2244,31 +2243,26 @@ public class TaskExecutorTest extends TestLogger {
@Test(timeout = 10000L)
public void testLogNotFoundHandling() throws Throwable {
- try (NetUtils.Port port = NetUtils.getAvailablePort()) {
- int dataPort = port.getPort();
-
- configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, dataPort);
- configuration.setInteger(
- NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
- configuration.setInteger(
- NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
- configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
-
- try (TaskSubmissionTestEnvironment env =
- new Builder(jobId)
- .setConfiguration(configuration)
- .setLocalCommunication(false)
- .build(EXECUTOR_RESOURCE.getExecutor())) {
- TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
- try {
- CompletableFuture<TransientBlobKey> logFuture =
- tmGateway.requestFileUploadByType(FileType.LOG, timeout);
- logFuture.get();
- } catch (Exception e) {
- assertThat(
- e.getMessage(),
- containsString("The file LOG does not exist on the TaskExecutor."));
- }
+ configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, 0);
+ configuration.setInteger(
+ NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
+ configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
+ configuration.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, "/i/dont/exist");
+
+ try (TaskSubmissionTestEnvironment env =
+ new Builder(jobId)
+ .setConfiguration(configuration)
+ .setLocalCommunication(false)
+ .build(EXECUTOR_RESOURCE.getExecutor())) {
+ TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
+ try {
+ CompletableFuture<TransientBlobKey> logFuture =
+ tmGateway.requestFileUploadByType(FileType.LOG, timeout);
+ logFuture.get();
+ } catch (Exception e) {
+ assertThat(
+ e.getMessage(),
+ containsString("The file LOG does not exist on the TaskExecutor."));
}
}
}