You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/11/18 02:43:19 UTC

[flink] branch master updated (f734fa717a8 -> 93c834be953)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from f734fa717a8 [FLINK-30029][ci] Parse version/classifier separately
     new 97c9abf9791 [hotfix] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ.
     new 93c834be953 [FLINK-29639] Print resourceId of remote taskmanager when encounter transport exception.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/io/network/ConnectionID.java     |  22 ++--
 .../runtime/io/network/NetworkClientHandler.java   |   2 +
 .../CreditBasedPartitionRequestClientHandler.java  |  37 ++++++-
 .../network/netty/NettyPartitionRequestClient.java |  16 ++-
 .../netty/PartitionRequestClientFactory.java       |   4 +
 .../runtime/shuffle/NettyShuffleDescriptor.java    |  38 +++++--
 .../ResultPartitionDeploymentDescriptorTest.java   |  44 ++++----
 .../runtime/deployment/ShuffleDescriptorTest.java  |  13 ++-
 .../netty/ClientTransportErrorHandlingTest.java    | 119 ++++++++++-----------
 ...editBasedPartitionRequestClientHandlerTest.java |   4 +
 .../netty/NettyPartitionRequestClientTest.java     |   4 +-
 .../runtime/io/network/netty/NettyTestUtil.java    |   4 +-
 .../netty/PartitionRequestClientFactoryTest.java   | 119 ++++++++++++---------
 .../partition/consumer/InputChannelBuilder.java    |   3 +-
 .../util/NettyShuffleDescriptorBuilder.java        |   8 +-
 15 files changed, 269 insertions(+), 168 deletions(-)


[flink] 01/02: [hotfix] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 97c9abf9791d8c08db27f0ef2a2a78488321b0a1
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Nov 16 21:05:24 2022 +0800

    [hotfix] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ.
---
 .../ResultPartitionDeploymentDescriptorTest.java   |  39 ++++----
 .../netty/ClientTransportErrorHandlingTest.java    | 106 ++++++++++-----------
 .../netty/PartitionRequestClientFactoryTest.java   |  99 ++++++++++---------
 3 files changed, 122 insertions(+), 122 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 33c79a1be4a..1fe6b9e329e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -31,20 +31,17 @@ import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionC
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link ResultPartitionDeploymentDescriptor}. */
-public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
+class ResultPartitionDeploymentDescriptorTest {
     private static final IntermediateDataSetID resultId = new IntermediateDataSetID();
     private static final int numberOfPartitions = 5;
 
@@ -78,18 +75,18 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
 
     /** Tests simple de/serialization with {@link UnknownShuffleDescriptor}. */
     @Test
-    public void testSerializationOfUnknownShuffleDescriptor() throws IOException {
+    void testSerializationOfUnknownShuffleDescriptor() throws IOException {
         ShuffleDescriptor shuffleDescriptor = new UnknownShuffleDescriptor(resultPartitionID);
         ShuffleDescriptor shuffleDescriptorCopy =
                 CommonTestUtils.createCopySerializable(shuffleDescriptor);
-        assertThat(shuffleDescriptorCopy, instanceOf(UnknownShuffleDescriptor.class));
-        assertThat(shuffleDescriptorCopy.getResultPartitionID(), is(resultPartitionID));
-        assertThat(shuffleDescriptorCopy.isUnknown(), is(true));
+        assertThat(shuffleDescriptorCopy).isInstanceOf(UnknownShuffleDescriptor.class);
+        assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID());
+        assertThat(shuffleDescriptorCopy.isUnknown()).isTrue();
     }
 
     /** Tests simple de/serialization with {@link NettyShuffleDescriptor}. */
     @Test
-    public void testSerializationWithNettyShuffleDescriptor() throws IOException {
+    void testSerializationWithNettyShuffleDescriptor() throws IOException {
         ShuffleDescriptor shuffleDescriptor =
                 new NettyShuffleDescriptor(
                         producerLocation,
@@ -99,13 +96,13 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
         ResultPartitionDeploymentDescriptor copy =
                 createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);
 
-        assertThat(copy.getShuffleDescriptor(), instanceOf(NettyShuffleDescriptor.class));
+        assertThat(copy.getShuffleDescriptor()).isInstanceOf(NettyShuffleDescriptor.class);
         NettyShuffleDescriptor shuffleDescriptorCopy =
                 (NettyShuffleDescriptor) copy.getShuffleDescriptor();
-        assertThat(shuffleDescriptorCopy.getResultPartitionID(), is(resultPartitionID));
-        assertThat(shuffleDescriptorCopy.isUnknown(), is(false));
-        assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation), is(true));
-        assertThat(shuffleDescriptorCopy.getConnectionId(), is(connectionID));
+        assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID());
+        assertThat(shuffleDescriptorCopy.isUnknown()).isFalse();
+        assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation)).isTrue();
+        assertThat(connectionID).isEqualTo(shuffleDescriptorCopy.getConnectionId());
     }
 
     private static ResultPartitionDeploymentDescriptor
@@ -121,10 +118,10 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
 
     private static void verifyResultPartitionDeploymentDescriptorCopy(
             ResultPartitionDeploymentDescriptor copy) {
-        assertThat(copy.getResultId(), is(resultId));
-        assertThat(copy.getTotalNumberOfPartitions(), is(numberOfPartitions));
-        assertThat(copy.getPartitionId(), is(partitionId));
-        assertThat(copy.getPartitionType(), is(partitionType));
-        assertThat(copy.getNumberOfSubpartitions(), is(numberOfSubpartitions));
+        assertThat(resultId).isEqualTo(copy.getResultId());
+        assertThat(numberOfPartitions).isEqualTo(copy.getTotalNumberOfPartitions());
+        assertThat(partitionId).isEqualTo(copy.getPartitionId());
+        assertThat(partitionType).isEqualTo(copy.getPartitionType());
+        assertThat(numberOfSubpartitions).isEqualTo(copy.getNumberOfSubpartitions());
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index f32e0a9b590..5830620d893 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -40,7 +40,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAda
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
 import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -52,11 +52,8 @@ import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doAnswer;
@@ -67,14 +64,14 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class ClientTransportErrorHandlingTest {
+class ClientTransportErrorHandlingTest {
 
     /**
      * Verifies that failed client requests via {@link PartitionRequestClient} are correctly
      * attributed to the respective {@link RemoteInputChannel}.
      */
     @Test
-    public void testExceptionOnWrite() throws Exception {
+    void testExceptionOnWrite() throws Exception {
 
         NettyProtocol protocol =
                 new NettyProtocol(
@@ -146,14 +143,13 @@ public class ClientTransportErrorHandlingTest {
 
         // Second request is *not* successful
         requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0);
-
         // Wait for the notification and it could confirm all the request operations are done
-        if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
-            fail(
-                    "Timed out after waiting for "
-                            + TestingUtils.TESTING_DURATION.toMillis()
-                            + " ms to be notified about the channel error.");
-        }
+        assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS))
+                .withFailMessage(
+                        "Timed out after waiting for "
+                                + TestingUtils.TESTING_DURATION.toMillis()
+                                + " ms to be notified about the channel error.")
+                .isTrue();
 
         // Only the second channel should be notified about the error
         verify(rich[0], times(0)).onError(any(LocalTransportException.class));
@@ -166,7 +162,7 @@ public class ClientTransportErrorHandlingTest {
      * RemoteTransportException} instances.
      */
     @Test
-    public void testWrappingOfRemoteErrorMessage() throws Exception {
+    void testWrappingOfRemoteErrorMessage() throws Exception {
         EmbeddedChannel ch = createEmbeddedChannel();
 
         NetworkClientHandler handler = getClientHandler(ch);
@@ -187,14 +183,12 @@ public class ClientTransportErrorHandlingTest {
                                 new RuntimeException("Expected test exception"),
                                 rich[0].getInputChannelId()));
 
-        try {
-            // Exception should not reach end of pipeline...
-            ch.checkException();
-        } catch (Exception e) {
-            fail(
-                    "The exception reached the end of the pipeline and "
-                            + "was not handled correctly by the last handler.");
-        }
+        // Exception should not reach end of pipeline...
+        assertThatNoException()
+                .describedAs(
+                        "The exception reached the end of the pipeline and "
+                                + "was not handled correctly by the last handler.")
+                .isThrownBy(ch::checkException);
 
         verify(rich[0], times(1)).onError(isA(RemoteTransportException.class));
         verify(rich[1], never()).onError(any(Throwable.class));
@@ -205,14 +199,12 @@ public class ClientTransportErrorHandlingTest {
                         new NettyMessage.ErrorResponse(
                                 new RuntimeException("Expected test exception")));
 
-        try {
-            // Exception should not reach end of pipeline...
-            ch.checkException();
-        } catch (Exception e) {
-            fail(
-                    "The exception reached the end of the pipeline and "
-                            + "was not handled correctly by the last handler.");
-        }
+        // Exception should not reach end of pipeline...
+        assertThatNoException()
+                .describedAs(
+                        "The exception reached the end of the pipeline and "
+                                + "was not handled correctly by the last handler.")
+                .isThrownBy(ch::checkException);
 
         verify(rich[0], times(2)).onError(isA(RemoteTransportException.class));
         verify(rich[1], times(1)).onError(isA(RemoteTransportException.class));
@@ -223,7 +215,7 @@ public class ClientTransportErrorHandlingTest {
      * RemoteTransportException}.
      */
     @Test
-    public void testExceptionOnRemoteClose() throws Exception {
+    void testExceptionOnRemoteClose() throws Exception {
 
         NettyProtocol protocol =
                 new NettyProtocol(
@@ -275,12 +267,12 @@ public class ClientTransportErrorHandlingTest {
         ch.writeAndFlush(Unpooled.buffer().writerIndex(16));
 
         // Wait for the notification
-        if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
-            fail(
-                    "Timed out after waiting for "
-                            + TestingUtils.TESTING_DURATION.toMillis()
-                            + " ms to be notified about remote connection close.");
-        }
+        assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS))
+                .withFailMessage(
+                        "Timed out after waiting for "
+                                + TestingUtils.TESTING_DURATION.toMillis()
+                                + " ms to be notified about remote connection close.")
+                .isTrue();
 
         // All the registered channels should be notified.
         for (RemoteInputChannel r : rich) {
@@ -292,7 +284,7 @@ public class ClientTransportErrorHandlingTest {
 
     /** Verifies that fired Exceptions are handled correctly by the pipeline. */
     @Test
-    public void testExceptionCaught() throws Exception {
+    void testExceptionCaught() throws Exception {
         EmbeddedChannel ch = createEmbeddedChannel();
 
         NetworkClientHandler handler = getClientHandler(ch);
@@ -308,14 +300,12 @@ public class ClientTransportErrorHandlingTest {
 
         ch.pipeline().fireExceptionCaught(new Exception());
 
-        try {
-            // Exception should not reach end of pipeline...
-            ch.checkException();
-        } catch (Exception e) {
-            fail(
-                    "The exception reached the end of the pipeline and "
-                            + "was not handled correctly by the last handler.");
-        }
+        // Exception should not reach end of pipeline...
+        assertThatNoException()
+                .describedAs(
+                        "The exception reached the end of the pipeline and "
+                                + "was not handled correctly by the last handler.")
+                .isThrownBy(ch::checkException);
 
         // ...but all the registered channels should be notified.
         for (RemoteInputChannel r : rich) {
@@ -328,7 +318,7 @@ public class ClientTransportErrorHandlingTest {
      * instance of {@link RemoteTransportException}.
      */
     @Test
-    public void testConnectionResetByPeer() throws Throwable {
+    void testConnectionResetByPeer() throws Throwable {
         EmbeddedChannel ch = createEmbeddedChannel();
 
         NetworkClientHandler handler = getClientHandler(ch);
@@ -345,13 +335,13 @@ public class ClientTransportErrorHandlingTest {
                                 Throwable cause = (Throwable) invocation.getArguments()[0];
 
                                 try {
-                                    assertEquals(RemoteTransportException.class, cause.getClass());
-                                    assertNotEquals("Connection reset by peer", cause.getMessage());
+                                    assertThat(cause).isInstanceOf(RemoteTransportException.class);
+                                    assertThat(cause)
+                                            .hasMessageNotContaining("Connection reset by peer");
 
-                                    assertEquals(IOException.class, cause.getCause().getClass());
-                                    assertEquals(
-                                            "Connection reset by peer",
-                                            cause.getCause().getMessage());
+                                    assertThat(cause.getCause()).isInstanceOf(IOException.class);
+                                    assertThat(cause.getCause())
+                                            .hasMessage("Connection reset by peer");
                                 } catch (Throwable t) {
                                     error[0] = t;
                                 }
@@ -364,12 +354,12 @@ public class ClientTransportErrorHandlingTest {
 
         ch.pipeline().fireExceptionCaught(new IOException("Connection reset by peer"));
 
-        assertNull(error[0]);
+        assertThat(error[0]).isNull();
     }
 
     /** Verifies that the channel is closed if there is an error *during* error notification. */
     @Test
-    public void testChannelClosedOnExceptionDuringErrorNotification() throws Exception {
+    void testChannelClosedOnExceptionDuringErrorNotification() throws Exception {
         EmbeddedChannel ch = createEmbeddedChannel();
 
         NetworkClientHandler handler = getClientHandler(ch);
@@ -382,7 +372,7 @@ public class ClientTransportErrorHandlingTest {
 
         ch.pipeline().fireExceptionCaught(new Exception());
 
-        assertFalse(ch.isActive());
+        assertThat(ch.isActive()).isFalse();
     }
 
     // ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index d2b57c5409d..009fe74915c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -21,20 +21,24 @@ package org.apache.flink.runtime.io.network.netty;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -43,23 +47,23 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 
 /** {@link PartitionRequestClientFactory} test. */
-@RunWith(Parameterized.class)
+@ExtendWith(ParameterizedTestExtension.class)
 public class PartitionRequestClientFactoryTest extends TestLogger {
-    @Parameterized.Parameter public boolean connectionReuseEnabled;
+    @Parameter public boolean connectionReuseEnabled;
 
-    @Parameterized.Parameters(name = "connection reuse enabled = {0}")
-    public static Object[] parameters() {
-        return new Object[][] {new Object[] {true}, new Object[] {false}};
+    @Parameters(name = "connectionReuseEnabled={0}")
+    public static Collection<Boolean> parameters() {
+        return Arrays.asList(false, true);
     }
 
-    @Test
-    public void testInterruptsNotCached() throws Exception {
+    @TestTemplate
+    void testInterruptsNotCached() throws Exception {
         NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient();
         try {
             AwaitingNettyClient nettyClient =
@@ -100,8 +104,8 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
         interrupted.get();
     }
 
-    @Test
-    public void testExceptionsAreNotCached() throws Exception {
+    @TestTemplate
+    void testExceptionsAreNotCached() throws Exception {
         NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient();
 
         try {
@@ -111,12 +115,9 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                             connectionReuseEnabled);
 
             final ConnectionID connectionID = nettyServerAndClient.getConnectionID(0);
-            try {
-                factory.createPartitionRequestClient(connectionID);
-                fail("Expected the first request to fail.");
-            } catch (RemoteTransportException expected) {
-                // expected
-            }
+            assertThatThrownBy(() -> factory.createPartitionRequestClient(connectionID))
+                    .withFailMessage("Expected the first request to fail.")
+                    .isInstanceOf(RemoteTransportException.class);
 
             factory.createPartitionRequestClient(connectionID);
         } finally {
@@ -125,8 +126,8 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
         }
     }
 
-    @Test
-    public void testReuseNettyPartitionRequestClient() throws Exception {
+    @TestTemplate
+    void testReuseNettyPartitionRequestClient() throws Exception {
         NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient();
         try {
             checkReuseNettyPartitionRequestClient(nettyServerAndClient, 1);
@@ -155,11 +156,11 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                     nettyServerAndClient.getConnectionID((int) (Math.random() * Integer.MAX_VALUE));
             set.add(factory.createPartitionRequestClient(connectionID));
         }
-        assertTrue(set.size() <= maxNumberOfConnections);
+        assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
-    @Test
-    public void testNettyClientConnectRetry() throws Exception {
+    @TestTemplate
+    void testNettyClientConnectRetry() throws Exception {
         NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient();
         UnstableNettyClient unstableNettyClient =
                 new UnstableNettyClient(serverAndClient.client(), 2);
@@ -175,23 +176,31 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
     }
 
     // see https://issues.apache.org/jira/browse/FLINK-18821
-    @Test(expected = IOException.class)
-    public void testFailureReportedToSubsequentRequests() throws Exception {
+    @TestTemplate
+    void testFailureReportedToSubsequentRequests() throws Exception {
         PartitionRequestClientFactory factory =
                 new PartitionRequestClientFactory(
                         new FailingNettyClient(), 2, 1, connectionReuseEnabled);
-        try {
-            factory.createPartitionRequestClient(
-                    new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
-        } catch (Exception e) {
-            // expected
-        }
-        factory.createPartitionRequestClient(
-                new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
+
+        assertThatThrownBy(
+                () ->
+                        factory.createPartitionRequestClient(
+                                new ConnectionID(
+                                        new InetSocketAddress(InetAddress.getLocalHost(), 8080),
+                                        0)));
+
+        assertThatThrownBy(
+                        () ->
+                                factory.createPartitionRequestClient(
+                                        new ConnectionID(
+                                                new InetSocketAddress(
+                                                        InetAddress.getLocalHost(), 8080),
+                                                0)))
+                .isInstanceOf(IOException.class);
     }
 
-    @Test(expected = IOException.class)
-    public void testNettyClientConnectRetryFailure() throws Exception {
+    @TestTemplate
+    void testNettyClientConnectRetryFailure() throws Exception {
         NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient();
         UnstableNettyClient unstableNettyClient =
                 new UnstableNettyClient(serverAndClient.client(), 3);
@@ -201,16 +210,20 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                     new PartitionRequestClientFactory(
                             unstableNettyClient, 2, 1, connectionReuseEnabled);
 
-            factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
-
+            assertThatThrownBy(
+                            () -> {
+                                factory.createPartitionRequestClient(
+                                        serverAndClient.getConnectionID(0));
+                            })
+                    .isInstanceOf(IOException.class);
         } finally {
             serverAndClient.client().shutdown();
             serverAndClient.server().shutdown();
         }
     }
 
-    @Test
-    public void testNettyClientConnectRetryMultipleThread() throws Exception {
+    @TestTemplate
+    void testNettyClientConnectRetryMultipleThread() throws Exception {
         NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient();
         UnstableNettyClient unstableNettyClient =
                 new UnstableNettyClient(serverAndClient.client(), 2);
@@ -245,7 +258,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                     NettyPartitionRequestClient client;
                     try {
                         client = runnableFuture.get();
-                        assertNotNull(client);
+                        assertThat(client).isNotNull();
                     } catch (Exception e) {
                         System.out.println(e.getMessage());
                         fail();


[flink] 02/02: [FLINK-29639] Print resourceId of remote taskmanager when encounter transport exception.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93c834be953f1336adb3ec5b5bf759a20e25eddf
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Nov 16 21:24:23 2022 +0800

    [FLINK-29639] Print resourceId of remote taskmanager when encounter transport exception.
    
    This closes #21331
---
 .../flink/runtime/io/network/ConnectionID.java     | 22 +++++++++----
 .../runtime/io/network/NetworkClientHandler.java   |  2 ++
 .../CreditBasedPartitionRequestClientHandler.java  | 37 +++++++++++++++++++--
 .../network/netty/NettyPartitionRequestClient.java | 16 +++++++--
 .../netty/PartitionRequestClientFactory.java       |  4 +++
 .../runtime/shuffle/NettyShuffleDescriptor.java    | 38 +++++++++++++++-------
 .../ResultPartitionDeploymentDescriptorTest.java   |  5 +--
 .../runtime/deployment/ShuffleDescriptorTest.java  | 13 ++++++--
 .../netty/ClientTransportErrorHandlingTest.java    | 13 +++++---
 ...editBasedPartitionRequestClientHandlerTest.java |  4 +++
 .../netty/NettyPartitionRequestClientTest.java     |  4 ++-
 .../runtime/io/network/netty/NettyTestUtil.java    |  4 ++-
 .../netty/PartitionRequestClientFactoryTest.java   | 22 +++++++++----
 .../partition/consumer/InputChannelBuilder.java    |  3 +-
 .../util/NettyShuffleDescriptorBuilder.java        |  8 ++---
 15 files changed, 148 insertions(+), 47 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
index 6cb0fa29f20..fb60340c673 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -43,18 +44,26 @@ public class ConnectionID implements Serializable {
 
     private final int connectionIndex;
 
+    private final ResourceID resourceID;
+
     public ConnectionID(TaskManagerLocation connectionInfo, int connectionIndex) {
         this(
+                connectionInfo.getResourceID(),
                 new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()),
                 connectionIndex);
     }
 
-    public ConnectionID(InetSocketAddress address, int connectionIndex) {
+    public ConnectionID(ResourceID resourceID, InetSocketAddress address, int connectionIndex) {
+        this.resourceID = checkNotNull(resourceID);
         this.address = checkNotNull(address);
         checkArgument(connectionIndex >= 0);
         this.connectionIndex = connectionIndex;
     }
 
+    public ResourceID getResourceID() {
+        return resourceID;
+    }
+
     public InetSocketAddress getAddress() {
         return address;
     }
@@ -75,15 +84,14 @@ public class ConnectionID implements Serializable {
         }
 
         final ConnectionID ra = (ConnectionID) other;
-        if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) {
-            return false;
-        }
-
-        return true;
+        return ra.getAddress().equals(address)
+                && ra.getConnectionIndex() == connectionIndex
+                && ra.getResourceID().equals(resourceID);
     }
 
     @Override
     public String toString() {
-        return address + " [" + connectionIndex + "]";
+        return String.format(
+                "%s (%s) [%s]", address, resourceID.getStringWithMetadata(), connectionIndex);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
index 18ae9e6d9a9..354da6c3a32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
@@ -39,6 +39,8 @@ public interface NetworkClientHandler extends ChannelHandler {
 
     void cancelRequestFor(InputChannelID inputChannelId);
 
+    void setConnectionId(ConnectionID connectionId);
+
     /**
      * Return whether there is channel error.
      *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 00d8e6e03f8..6480b49fef7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Channel handler to read the messages of buffer response or error response from the producer, to
  * write and flush the unannounced credits for the producer.
@@ -74,6 +77,8 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
      */
     private volatile ChannelHandlerContext ctx;
 
+    private ConnectionID connectionID;
+
     // ------------------------------------------------------------------------
     // Input channel/receiver registration
     // ------------------------------------------------------------------------
@@ -128,6 +133,9 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
                     new RemoteTransportException(
                             "Connection unexpectedly closed by remote task manager '"
                                     + remoteAddr
+                                    + " [ "
+                                    + connectionID.getResourceID().getStringWithMetadata()
+                                    + " ] "
                                     + "'. "
                                     + "This might indicate that the remote task manager was lost.",
                             remoteAddr));
@@ -157,6 +165,9 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
                         new RemoteTransportException(
                                 "Lost connection to task manager '"
                                         + remoteAddr
+                                        + " [ "
+                                        + connectionID.getResourceID().getStringWithMetadata()
+                                        + " ] "
                                         + "'. "
                                         + "This indicates that the remote task manager was lost.",
                                 remoteAddr,
@@ -166,7 +177,10 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
                 tex =
                         new LocalTransportException(
                                 String.format(
-                                        "%s (connection to '%s')", cause.getMessage(), remoteAddr),
+                                        "%s (connection to '%s [%s]')",
+                                        cause.getMessage(),
+                                        remoteAddr,
+                                        connectionID.getResourceID().getStringWithMetadata()),
                                 localAddr,
                                 cause);
             }
@@ -212,6 +226,11 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
         return channelError.get() != null;
     }
 
+    @Override
+    public void setConnectionId(ConnectionID connectionId) {
+        this.connectionID = checkNotNull(connectionId);
+    }
+
     @Override
     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
         writeAndFlushNextMessageIfPossible(ctx.channel());
@@ -287,7 +306,12 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
             if (error.isFatalError()) {
                 notifyAllChannelsOfErrorAndClose(
                         new RemoteTransportException(
-                                "Fatal error at remote task manager '" + remoteAddr + "'.",
+                                "Fatal error at remote task manager '"
+                                        + remoteAddr
+                                        + " [ "
+                                        + connectionID.getResourceID().getStringWithMetadata()
+                                        + " ] "
+                                        + "'.",
                                 remoteAddr,
                                 error.cause));
             } else {
@@ -299,7 +323,14 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
                     } else {
                         inputChannel.onError(
                                 new RemoteTransportException(
-                                        "Error at remote task manager '" + remoteAddr + "'.",
+                                        "Error at remote task manager '"
+                                                + remoteAddr
+                                                + " [ "
+                                                + connectionID
+                                                        .getResourceID()
+                                                        .getStringWithMetadata()
+                                                + " ] "
+                                                + "'.",
                                         remoteAddr,
                                         error.cause));
                     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
index 52e1b68d39d..de2fc747ecb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
@@ -77,6 +77,7 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
         this.clientHandler = checkNotNull(clientHandler);
         this.connectionId = checkNotNull(connectionId);
         this.clientFactory = checkNotNull(clientFactory);
+        clientHandler.setConnectionId(connectionId);
     }
 
     boolean canBeDisposed() {
@@ -138,8 +139,11 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
                             inputChannel.onError(
                                     new LocalTransportException(
                                             String.format(
-                                                    "Sending the partition request to '%s (#%d)' failed.",
+                                                    "Sending the partition request to '%s [%s] (#%d)' failed.",
                                                     connectionId.getAddress(),
+                                                    connectionId
+                                                            .getResourceID()
+                                                            .getStringWithMetadata(),
                                                     connectionId.getConnectionIndex()),
                                             future.channel().localAddress(),
                                             future.cause()));
@@ -197,8 +201,11 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
                                     inputChannel.onError(
                                             new LocalTransportException(
                                                     String.format(
-                                                            "Sending the task event to '%s (#%d)' failed.",
+                                                            "Sending the task event to '%s [%s] (#%d)' failed.",
                                                             connectionId.getAddress(),
+                                                            connectionId
+                                                                    .getResourceID()
+                                                                    .getStringWithMetadata(),
                                                             connectionId.getConnectionIndex()),
                                                     future.channel().localAddress(),
                                                     future.cause()));
@@ -275,7 +282,10 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
             final SocketAddress localAddr = tcpChannel.localAddress();
             final SocketAddress remoteAddr = tcpChannel.remoteAddress();
             throw new LocalTransportException(
-                    String.format("Channel to '%s' closed.", remoteAddr), localAddr);
+                    String.format(
+                            "Channel to '%s [%s]' closed.",
+                            remoteAddr, connectionId.getResourceID().getStringWithMetadata()),
+                    localAddr);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index 7463cad9e70..ca92f052ffe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -80,6 +80,7 @@ class PartitionRequestClientFactory {
         // We map the input ConnectionID to a new value to restrict the number of tcp connections
         connectionId =
                 new ConnectionID(
+                        connectionId.getResourceID(),
                         connectionId.getAddress(),
                         connectionId.getConnectionIndex() % maxNumberOfConnections);
         while (true) {
@@ -164,6 +165,9 @@ class PartitionRequestClientFactory {
             throw new RemoteTransportException(
                     "Connecting to remote task manager '"
                             + connectionId.getAddress()
+                            + " [ "
+                            + connectionId.getResourceID().getStringWithMetadata()
+                            + " ] "
                             + "' has failed. This might indicate that the remote task "
                             + "manager has been lost.",
                     connectionId.getAddress(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
index 9831e949d30..e73e305497f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
@@ -48,7 +48,10 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
     }
 
     public ConnectionID getConnectionId() {
-        return partitionConnectionInfo.getConnectionId();
+        return new ConnectionID(
+                producerLocation,
+                partitionConnectionInfo.getAddress(),
+                partitionConnectionInfo.getConnectionIndex());
     }
 
     @Override
@@ -66,9 +69,10 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
     }
 
     /** Information for connection to partition producer for shuffle exchange. */
-    @FunctionalInterface
     public interface PartitionConnectionInfo extends Serializable {
-        ConnectionID getConnectionId();
+        InetSocketAddress getAddress();
+
+        int getConnectionIndex();
     }
 
     /**
@@ -81,16 +85,22 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
 
         private static final long serialVersionUID = 5992534320110743746L;
 
-        private final ConnectionID connectionID;
+        private final InetSocketAddress address;
+
+        private final int connectionIndex;
 
         @VisibleForTesting
-        public NetworkPartitionConnectionInfo(ConnectionID connectionID) {
-            this.connectionID = connectionID;
+        public NetworkPartitionConnectionInfo(InetSocketAddress address, int connectionIndex) {
+            this.address = address;
+            this.connectionIndex = connectionIndex;
         }
 
-        @Override
-        public ConnectionID getConnectionId() {
-            return connectionID;
+        public InetSocketAddress getAddress() {
+            return address;
+        }
+
+        public int getConnectionIndex() {
+            return connectionIndex;
         }
 
         static NetworkPartitionConnectionInfo fromProducerDescriptor(
@@ -98,7 +108,7 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
             InetSocketAddress address =
                     new InetSocketAddress(
                             producerDescriptor.getAddress(), producerDescriptor.getDataPort());
-            return new NetworkPartitionConnectionInfo(new ConnectionID(address, connectionIndex));
+            return new NetworkPartitionConnectionInfo(address, connectionIndex);
         }
     }
 
@@ -111,7 +121,13 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
         INSTANCE;
 
         @Override
-        public ConnectionID getConnectionId() {
+        public InetSocketAddress getAddress() {
+            throw new UnsupportedOperationException(
+                    "Local execution does not support shuffle connection.");
+        }
+
+        @Override
+        public int getConnectionIndex() {
             throw new UnsupportedOperationException(
                     "Local execution does not support shuffle connection.");
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 1fe6b9e329e..3ea6def9e6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -71,7 +71,8 @@ class ResultPartitionDeploymentDescriptorTest {
 
     private static final ResourceID producerLocation = new ResourceID("producerLocation");
     private static final InetSocketAddress address = new InetSocketAddress("localhost", 10000);
-    private static final ConnectionID connectionID = new ConnectionID(address, connectionIndex);
+    private static final ConnectionID connectionID =
+            new ConnectionID(producerLocation, address, connectionIndex);
 
     /** Tests simple de/serialization with {@link UnknownShuffleDescriptor}. */
     @Test
@@ -90,7 +91,7 @@ class ResultPartitionDeploymentDescriptorTest {
         ShuffleDescriptor shuffleDescriptor =
                 new NettyShuffleDescriptor(
                         producerLocation,
-                        new NetworkPartitionConnectionInfo(connectionID),
+                        new NetworkPartitionConnectionInfo(address, connectionIndex),
                         resultPartitionID);
 
         ResultPartitionDeploymentDescriptor copy =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
index bb0e96c9250..920380aad90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
@@ -66,9 +66,10 @@ public class ShuffleDescriptorTest extends TestLogger {
                             jobID, localPartitionId, consumerResourceID);
 
             ResultPartitionID remotePartitionId = new ResultPartitionID();
+            ResourceID remoteResourceID = ResourceID.generate();
             ResultPartitionDeploymentDescriptor remotePartition =
                     createResultPartitionDeploymentDescriptor(
-                            jobID, remotePartitionId, ResourceID.generate());
+                            jobID, remotePartitionId, remoteResourceID);
 
             ResultPartitionID unknownPartitionId = new ResultPartitionID();
 
@@ -118,7 +119,15 @@ public class ShuffleDescriptorTest extends TestLogger {
                         remotePartitionId);
                 nettyShuffleDescriptor = (NettyShuffleDescriptor) remoteShuffleDescriptor;
                 assertThat(nettyShuffleDescriptor.isLocalTo(consumerResourceID), is(false));
-                assertThat(nettyShuffleDescriptor.getConnectionId(), is(STUB_CONNECTION_ID));
+                assertThat(
+                        nettyShuffleDescriptor.getConnectionId().getAddress(),
+                        is(STUB_CONNECTION_ID.getAddress()));
+                assertThat(
+                        nettyShuffleDescriptor.getConnectionId().getConnectionIndex(),
+                        is(STUB_CONNECTION_ID.getConnectionIndex()));
+                assertThat(
+                        nettyShuffleDescriptor.getConnectionId().getResourceID(),
+                        is(remoteResourceID));
             } else {
                 // Unknown (lazy deployment allowed)
                 verifyShuffleDescriptor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index 5830620d893..fae2163c97e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
@@ -45,6 +46,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -65,6 +67,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 class ClientTransportErrorHandlingTest {
+    private static final ConnectionID CONNECTION_ID =
+            new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0);
 
     /**
      * Verifies that failed client requests via {@link PartitionRequestClient} are correctly
@@ -113,10 +117,7 @@ class ClientTransportErrorHandlingTest {
 
         PartitionRequestClient requestClient =
                 new NettyPartitionRequestClient(
-                        ch,
-                        handler,
-                        mock(ConnectionID.class),
-                        mock(PartitionRequestClientFactory.class));
+                        ch, handler, CONNECTION_ID, mock(PartitionRequestClientFactory.class));
 
         // Create input channels
         RemoteInputChannel[] rich =
@@ -396,7 +397,9 @@ class ClientTransportErrorHandlingTest {
     }
 
     private NetworkClientHandler getClientHandler(Channel ch) {
-        return ch.pipeline().get(NetworkClientHandler.class);
+        NetworkClientHandler networkClientHandler = ch.pipeline().get(NetworkClientHandler.class);
+        networkClientHandler.setConnectionId(CONNECTION_ID);
+        return networkClientHandler;
     }
 
     private RemoteInputChannel createRemoteInputChannel() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 1ea783d3e34..6b39979fac8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
@@ -62,6 +63,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 
 import static org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest.blockChannel;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
@@ -640,6 +642,8 @@ class CreditBasedPartitionRequestClientHandlerTest {
             Class<? extends TransportException> expectedClass, Exception cause) {
         CreditBasedPartitionRequestClientHandler handler =
                 new CreditBasedPartitionRequestClientHandler();
+        handler.setConnectionId(
+                new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0));
         EmbeddedChannel embeddedChannel =
                 new EmbeddedChannel(
                         // A test handler to trigger the exception.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
index f2b66c42490..6ede9dbdc90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
@@ -287,7 +288,8 @@ public class NettyPartitionRequestClientTest {
         try (NetUtils.Port availablePort = NetUtils.getAvailablePort()) {
             int port = availablePort.getPort();
             ConnectionID connectionID =
-                    new ConnectionID(new InetSocketAddress("localhost", port), 0);
+                    new ConnectionID(
+                            ResourceID.generate(), new InetSocketAddress("localhost", port), 0);
             NettyConfig config =
                     new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration());
             NettyClient nettyClient = new NettyClient(config);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
index 0ec9b5380da..5e4b5b68711 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.util.NetUtils;
 
@@ -224,8 +225,9 @@ public class NettyTestUtil {
             return client;
         }
 
-        ConnectionID getConnectionID(int connectionIndex) {
+        ConnectionID getConnectionID(ResourceID resourceID, int connectionIndex) {
             return new ConnectionID(
+                    resourceID,
                     new InetSocketAddress(
                             server.getConfig().getServerAddress(),
                             server.getConfig().getServerPort()),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 009fe74915c..c676046913d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -55,6 +56,8 @@ import static org.mockito.Mockito.mock;
 /** {@link PartitionRequestClientFactory} test. */
 @ExtendWith(ParameterizedTestExtension.class)
 public class PartitionRequestClientFactoryTest extends TestLogger {
+    private static final ResourceID RESOURCE_ID = ResourceID.generate();
+
     @Parameter public boolean connectionReuseEnabled;
 
     @Parameters(name = "connectionReuseEnabled={0}")
@@ -72,10 +75,11 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                     new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled);
 
             nettyClient.awaitForInterrupts = true;
-            connectAndInterrupt(factory, nettyServerAndClient.getConnectionID(0));
+            connectAndInterrupt(factory, nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
 
             nettyClient.awaitForInterrupts = false;
-            factory.createPartitionRequestClient(nettyServerAndClient.getConnectionID(0));
+            factory.createPartitionRequestClient(
+                    nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
         } finally {
             nettyServerAndClient.client().shutdown();
             nettyServerAndClient.server().shutdown();
@@ -114,7 +118,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                             new UnstableNettyClient(nettyServerAndClient.client(), 1),
                             connectionReuseEnabled);
 
-            final ConnectionID connectionID = nettyServerAndClient.getConnectionID(0);
+            final ConnectionID connectionID = nettyServerAndClient.getConnectionID(RESOURCE_ID, 0);
             assertThatThrownBy(() -> factory.createPartitionRequestClient(connectionID))
                     .withFailMessage("Expected the first request to fail.")
                     .isInstanceOf(RemoteTransportException.class);
@@ -153,7 +157,8 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                         connectionReuseEnabled);
         for (int i = 0; i < Math.max(100, maxNumberOfConnections); i++) {
             final ConnectionID connectionID =
-                    nettyServerAndClient.getConnectionID((int) (Math.random() * Integer.MAX_VALUE));
+                    nettyServerAndClient.getConnectionID(
+                            RESOURCE_ID, (int) (Math.random() * Integer.MAX_VALUE));
             set.add(factory.createPartitionRequestClient(connectionID));
         }
         assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
@@ -169,7 +174,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                 new PartitionRequestClientFactory(
                         unstableNettyClient, 2, 1, connectionReuseEnabled);
 
-        factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
+        factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0));
 
         serverAndClient.client().shutdown();
         serverAndClient.server().shutdown();
@@ -186,6 +191,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                 () ->
                         factory.createPartitionRequestClient(
                                 new ConnectionID(
+                                        ResourceID.generate(),
                                         new InetSocketAddress(InetAddress.getLocalHost(), 8080),
                                         0)));
 
@@ -193,6 +199,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                         () ->
                                 factory.createPartitionRequestClient(
                                         new ConnectionID(
+                                                ResourceID.generate(),
                                                 new InetSocketAddress(
                                                         InetAddress.getLocalHost(), 8080),
                                                 0)))
@@ -213,7 +220,7 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
             assertThatThrownBy(
                             () -> {
                                 factory.createPartitionRequestClient(
-                                        serverAndClient.getConnectionID(0));
+                                        serverAndClient.getConnectionID(RESOURCE_ID, 0));
                             })
                     .isInstanceOf(IOException.class);
         } finally {
@@ -243,7 +250,8 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                                 try {
                                     client =
                                             factory.createPartitionRequestClient(
-                                                    serverAndClient.getConnectionID(0));
+                                                    serverAndClient.getConnectionID(
+                                                            RESOURCE_ID, 0));
                                 } catch (Exception e) {
                                     fail(e.getMessage());
                                 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
index bb5d24a736b..622a1392a03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -38,7 +39,7 @@ import static org.apache.flink.runtime.io.network.partition.consumer.SingleInput
 /** Builder for various {@link InputChannel} types. */
 public class InputChannelBuilder {
     public static final ConnectionID STUB_CONNECTION_ID =
-            new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
+            new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 5000), 0);
 
     private int channelIndex = 0;
     private ResultPartitionID partitionId = new ResultPartitionID();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
index ee641da9f99..9de1228d9d1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
@@ -73,10 +72,11 @@ public class NettyShuffleDescriptorBuilder {
     }
 
     public NettyShuffleDescriptor buildRemote() {
-        ConnectionID connectionID =
-                new ConnectionID(new InetSocketAddress(address, dataPort), connectionIndex);
         return new NettyShuffleDescriptor(
-                producerLocation, new NetworkPartitionConnectionInfo(connectionID), id);
+                producerLocation,
+                new NetworkPartitionConnectionInfo(
+                        new InetSocketAddress(address, dataPort), connectionIndex),
+                id);
     }
 
     public NettyShuffleDescriptor buildLocal() {