You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by xt...@apache.org on 2022/11/22 09:14:27 UTC

[flink] branch release-1.15 updated (28a877b8880 -> 7a4fb7c86fb)

This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 28a877b8880 [FLINK-16582][runtime][test] Fix the buffer leaks in NettyBufferPoolTest
     new d521b030890 [FLINK-29639][runtime] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ.
     new 7a4fb7c86fb [FLINK-29639][runtime] Print resourceId of remote taskmanager when encounter transport exception.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/io/network/ConnectionID.java     |  22 +++-
 .../runtime/io/network/NetworkClientHandler.java   |   2 +
 .../CreditBasedPartitionRequestClientHandler.java  |  37 +++++-
 .../network/netty/NettyPartitionRequestClient.java |  16 ++-
 .../netty/PartitionRequestClientFactory.java       |   4 +
 .../runtime/shuffle/NettyShuffleDescriptor.java    |  38 ++++--
 .../ResultPartitionDeploymentDescriptorTest.java   |  46 ++++---
 .../runtime/deployment/ShuffleDescriptorTest.java  |  13 +-
 .../netty/ClientTransportErrorHandlingTest.java    | 119 ++++++++---------
 ...editBasedPartitionRequestClientHandlerTest.java |   4 +
 .../netty/NettyPartitionRequestClientTest.java     |   4 +-
 .../runtime/io/network/netty/NettyTestUtil.java    |   4 +-
 .../netty/PartitionRequestClientFactoryTest.java   | 143 ++++++++++++---------
 .../partition/consumer/InputChannelBuilder.java    |   3 +-
 .../util/NettyShuffleDescriptorBuilder.java        |   8 +-
 15 files changed, 280 insertions(+), 183 deletions(-)


[flink] 01/02: [FLINK-29639][runtime] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d521b030890c9c14749ff22054849ffff9fb92bd
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Nov 16 21:05:24 2022 +0800

    [FLINK-29639][runtime] Migrate ClientTransportErrorHandlingTest & PartitionRequestClientFactoryTest & ResultPartitionDeploymentDescriptorTest to JUnit5 and AssertJ.
    
    (cherry picked from commit 97c9abf9791d8c08db27f0ef2a2a78488321b0a1)
---
 .../ResultPartitionDeploymentDescriptorTest.java   |  41 +++---
 .../netty/ClientTransportErrorHandlingTest.java    | 106 +++++++---------
 .../netty/PartitionRequestClientFactoryTest.java   | 141 ++++++++++++---------
 .../util/NettyShuffleDescriptorBuilder.java        |   1 -
 4 files changed, 146 insertions(+), 143 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 39e91a45a1f..423512f7862 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -31,19 +31,16 @@ import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionC
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.UnknownShuffleDescriptor;
-import org.apache.flink.util.TestLogger;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link ResultPartitionDeploymentDescriptor}. */
-public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
+class ResultPartitionDeploymentDescriptorTest {
     private static final IntermediateDataSetID resultId = new IntermediateDataSetID();
     private static final int numberOfPartitions = 5;
 
@@ -73,18 +70,18 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
 
     /** Tests simple de/serialization with {@link UnknownShuffleDescriptor}. */
     @Test
-    public void testSerializationOfUnknownShuffleDescriptor() throws IOException {
+    void testSerializationOfUnknownShuffleDescriptor() throws IOException {
         ShuffleDescriptor shuffleDescriptor = new UnknownShuffleDescriptor(resultPartitionID);
         ShuffleDescriptor shuffleDescriptorCopy =
                 CommonTestUtils.createCopySerializable(shuffleDescriptor);
-        assertThat(shuffleDescriptorCopy, instanceOf(UnknownShuffleDescriptor.class));
-        assertThat(shuffleDescriptorCopy.getResultPartitionID(), is(resultPartitionID));
-        assertThat(shuffleDescriptorCopy.isUnknown(), is(true));
+        assertThat(shuffleDescriptorCopy).isInstanceOf(UnknownShuffleDescriptor.class);
+        assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID());
+        assertThat(shuffleDescriptorCopy.isUnknown()).isTrue();
     }
 
     /** Tests simple de/serialization with {@link NettyShuffleDescriptor}. */
     @Test
-    public void testSerializationWithNettyShuffleDescriptor() throws IOException {
+    void testSerializationWithNettyShuffleDescriptor() throws IOException {
         ShuffleDescriptor shuffleDescriptor =
                 new NettyShuffleDescriptor(
                         producerLocation,
@@ -94,13 +91,13 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
         ResultPartitionDeploymentDescriptor copy =
                 createCopyAndVerifyResultPartitionDeploymentDescriptor(shuffleDescriptor);
 
-        assertThat(copy.getShuffleDescriptor(), instanceOf(NettyShuffleDescriptor.class));
+        assertThat(copy.getShuffleDescriptor()).isInstanceOf(NettyShuffleDescriptor.class);
         NettyShuffleDescriptor shuffleDescriptorCopy =
                 (NettyShuffleDescriptor) copy.getShuffleDescriptor();
-        assertThat(shuffleDescriptorCopy.getResultPartitionID(), is(resultPartitionID));
-        assertThat(shuffleDescriptorCopy.isUnknown(), is(false));
-        assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation), is(true));
-        assertThat(shuffleDescriptorCopy.getConnectionId(), is(connectionID));
+        assertThat(resultPartitionID).isEqualTo(shuffleDescriptorCopy.getResultPartitionID());
+        assertThat(shuffleDescriptorCopy.isUnknown()).isFalse();
+        assertThat(shuffleDescriptorCopy.isLocalTo(producerLocation)).isTrue();
+        assertThat(connectionID).isEqualTo(shuffleDescriptorCopy.getConnectionId());
     }
 
     private static ResultPartitionDeploymentDescriptor
@@ -116,11 +113,11 @@ public class ResultPartitionDeploymentDescriptorTest extends TestLogger {
 
     private static void verifyResultPartitionDeploymentDescriptorCopy(
             ResultPartitionDeploymentDescriptor copy) {
-        assertThat(copy.getResultId(), is(resultId));
-        assertThat(copy.getTotalNumberOfPartitions(), is(numberOfPartitions));
-        assertThat(copy.getPartitionId(), is(partitionId));
-        assertThat(copy.getPartitionType(), is(partitionType));
-        assertThat(copy.getNumberOfSubpartitions(), is(numberOfSubpartitions));
-        assertThat(copy.notifyPartitionDataAvailable(), is(true));
+        assertThat(resultId).isEqualTo(copy.getResultId());
+        assertThat(numberOfPartitions).isEqualTo(copy.getTotalNumberOfPartitions());
+        assertThat(partitionId).isEqualTo(copy.getPartitionId());
+        assertThat(partitionType).isEqualTo(copy.getPartitionType());
+        assertThat(numberOfSubpartitions).isEqualTo(copy.getNumberOfSubpartitions());
+        assertThat(copy.notifyPartitionDataAvailable()).isTrue();
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index f32e0a9b590..5830620d893 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -40,7 +40,7 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAda
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
 import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -52,11 +52,8 @@ import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doAnswer;
@@ -67,14 +64,14 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-public class ClientTransportErrorHandlingTest {
+class ClientTransportErrorHandlingTest {
 
     /**
      * Verifies that failed client requests via {@link PartitionRequestClient} are correctly
      * attributed to the respective {@link RemoteInputChannel}.
      */
     @Test
-    public void testExceptionOnWrite() throws Exception {
+    void testExceptionOnWrite() throws Exception {
 
         NettyProtocol protocol =
                 new NettyProtocol(
@@ -146,14 +143,13 @@ public class ClientTransportErrorHandlingTest {
 
         // Second request is *not* successful
         requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0);
-
         // Wait for the notification and it could confirm all the request operations are done
-        if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
-            fail(
-                    "Timed out after waiting for "
-                            + TestingUtils.TESTING_DURATION.toMillis()
-                            + " ms to be notified about the channel error.");
-        }
+        assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS))
+                .withFailMessage(
+                        "Timed out after waiting for "
+                                + TestingUtils.TESTING_DURATION.toMillis()
+                                + " ms to be notified about the channel error.")
+                .isTrue();
 
         // Only the second channel should be notified about the error
         verify(rich[0], times(0)).onError(any(LocalTransportException.class));
@@ -166,7 +162,7 @@ public class ClientTransportErrorHandlingTest {
      * RemoteTransportException} instances.
      */
     @Test
-    public void testWrappingOfRemoteErrorMessage() throws Exception {
+    void testWrappingOfRemoteErrorMessage() throws Exception {
         EmbeddedChannel ch = createEmbeddedChannel();
 
         NetworkClientHandler handler = getClientHandler(ch);
@@ -187,14 +183,12 @@ public class ClientTransportErrorHandlingTest {
                                 new RuntimeException("Expected test exception"),
                                 rich[0].getInputChannelId()));
 
-        try {
-            // Exception should not reach end of pipeline...
-            ch.checkException();
-        } catch (Exception e) {
-            fail(
-                    "The exception reached the end of the pipeline and "
-                            + "was not handled correctly by the last handler.");
-        }
+        // Exception should not reach end of pipeline...
+        assertThatNoException()
+                .describedAs(
+                        "The exception reached the end of the pipeline and "
+                                + "was not handled correctly by the last handler.")
+                .isThrownBy(ch::checkException);
 
         verify(rich[0], times(1)).onError(isA(RemoteTransportException.class));
         verify(rich[1], never()).onError(any(Throwable.class));
@@ -205,14 +199,12 @@ public class ClientTransportErrorHandlingTest {
                         new NettyMessage.ErrorResponse(
                                 new RuntimeException("Expected test exception")));
 
-        try {
-            // Exception should not reach end of pipeline...
-            ch.checkException();
-        } catch (Exception e) {
-            fail(
-                    "The exception reached the end of the pipeline and "
-                            + "was not handled correctly by the last handler.");
-        }
+        // Exception should not reach end of pipeline...
+        assertThatNoException()
+                .describedAs(
+                        "The exception reached the end of the pipeline and "
+                                + "was not handled correctly by the last handler.")
+                .isThrownBy(ch::checkException);
 
         verify(rich[0], times(2)).onError(isA(RemoteTransportException.class));
         verify(rich[1], times(1)).onError(isA(RemoteTransportException.class));
@@ -223,7 +215,7 @@ public class ClientTransportErrorHandlingTest {
      * RemoteTransportException}.
      */
     @Test
-    public void testExceptionOnRemoteClose() throws Exception {
+    void testExceptionOnRemoteClose() throws Exception {
 
         NettyProtocol protocol =
                 new NettyProtocol(
@@ -275,12 +267,12 @@ public class ClientTransportErrorHandlingTest {
         ch.writeAndFlush(Unpooled.buffer().writerIndex(16));
 
         // Wait for the notification
-        if (!sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS)) {
-            fail(
-                    "Timed out after waiting for "
-                            + TestingUtils.TESTING_DURATION.toMillis()
-                            + " ms to be notified about remote connection close.");
-        }
+        assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS))
+                .withFailMessage(
+                        "Timed out after waiting for "
+                                + TestingUtils.TESTING_DURATION.toMillis()
+                                + " ms to be notified about remote connection close.")
+                .isTrue();
 
         // All the registered channels should be notified.
         for (RemoteInputChannel r : rich) {
@@ -292,7 +284,7 @@ public class ClientTransportErrorHandlingTest {
 
     /** Verifies that fired Exceptions are handled correctly by the pipeline. */
     @Test
-    public void testExceptionCaught() throws Exception {
+    void testExceptionCaught() throws Exception {
         EmbeddedChannel ch = createEmbeddedChannel();
 
         NetworkClientHandler handler = getClientHandler(ch);
@@ -308,14 +300,12 @@ public class ClientTransportErrorHandlingTest {
 
         ch.pipeline().fireExceptionCaught(new Exception());
 
-        try {
-            // Exception should not reach end of pipeline...
-            ch.checkException();
-        } catch (Exception e) {
-            fail(
-                    "The exception reached the end of the pipeline and "
-                            + "was not handled correctly by the last handler.");
-        }
+        // Exception should not reach end of pipeline...
+        assertThatNoException()
+                .describedAs(
+                        "The exception reached the end of the pipeline and "
+                                + "was not handled correctly by the last handler.")
+                .isThrownBy(ch::checkException);
 
         // ...but all the registered channels should be notified.
         for (RemoteInputChannel r : rich) {
@@ -328,7 +318,7 @@ public class ClientTransportErrorHandlingTest {
      * instance of {@link RemoteTransportException}.
      */
     @Test
-    public void testConnectionResetByPeer() throws Throwable {
+    void testConnectionResetByPeer() throws Throwable {
         EmbeddedChannel ch = createEmbeddedChannel();
 
         NetworkClientHandler handler = getClientHandler(ch);
@@ -345,13 +335,13 @@ public class ClientTransportErrorHandlingTest {
                                 Throwable cause = (Throwable) invocation.getArguments()[0];
 
                                 try {
-                                    assertEquals(RemoteTransportException.class, cause.getClass());
-                                    assertNotEquals("Connection reset by peer", cause.getMessage());
+                                    assertThat(cause).isInstanceOf(RemoteTransportException.class);
+                                    assertThat(cause)
+                                            .hasMessageNotContaining("Connection reset by peer");
 
-                                    assertEquals(IOException.class, cause.getCause().getClass());
-                                    assertEquals(
-                                            "Connection reset by peer",
-                                            cause.getCause().getMessage());
+                                    assertThat(cause.getCause()).isInstanceOf(IOException.class);
+                                    assertThat(cause.getCause())
+                                            .hasMessage("Connection reset by peer");
                                 } catch (Throwable t) {
                                     error[0] = t;
                                 }
@@ -364,12 +354,12 @@ public class ClientTransportErrorHandlingTest {
 
         ch.pipeline().fireExceptionCaught(new IOException("Connection reset by peer"));
 
-        assertNull(error[0]);
+        assertThat(error[0]).isNull();
     }
 
     /** Verifies that the channel is closed if there is an error *during* error notification. */
     @Test
-    public void testChannelClosedOnExceptionDuringErrorNotification() throws Exception {
+    void testChannelClosedOnExceptionDuringErrorNotification() throws Exception {
         EmbeddedChannel ch = createEmbeddedChannel();
 
         NetworkClientHandler handler = getClientHandler(ch);
@@ -382,7 +372,7 @@ public class ClientTransportErrorHandlingTest {
 
         ch.pipeline().fireExceptionCaught(new Exception());
 
-        assertFalse(ch.isActive());
+        assertThat(ch.isActive()).isFalse();
     }
 
     // ---------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 968b824d541..7ebc2aed829 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
-import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
@@ -30,9 +30,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
 
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.net.InetAddress;
@@ -49,23 +48,16 @@ import java.util.concurrent.Future;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
 
-/** {@link PartitionRequestClientFactory} test. */
-@RunWith(Parameterized.class)
-public class PartitionRequestClientFactoryTest extends TestLogger {
-    @Parameterized.Parameter public boolean connectionReuseEnabled;
+public class PartitionRequestClientFactoryTest {
+    private static final ResourceID RESOURCE_ID = ResourceID.generate();
 
-    @Parameterized.Parameters(name = "connection reuse enabled = {0}")
-    public static Object[] parameters() {
-        return new Object[][] {new Object[] {true}, new Object[] {false}};
-    }
-
-    @Test
-    public void testInterruptsNotCached() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testInterruptsNotCached(boolean connectionReuseEnabled) throws Exception {
         NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient();
         try {
             AwaitingNettyClient nettyClient =
@@ -74,10 +66,11 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                     new PartitionRequestClientFactory(nettyClient, connectionReuseEnabled);
 
             nettyClient.awaitForInterrupts = true;
-            connectAndInterrupt(factory, nettyServerAndClient.getConnectionID(0));
+            connectAndInterrupt(factory, nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
 
             nettyClient.awaitForInterrupts = false;
-            factory.createPartitionRequestClient(nettyServerAndClient.getConnectionID(0));
+            factory.createPartitionRequestClient(
+                    nettyServerAndClient.getConnectionID(RESOURCE_ID, 0));
         } finally {
             nettyServerAndClient.client().shutdown();
             nettyServerAndClient.server().shutdown();
@@ -106,8 +99,9 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
         interrupted.get();
     }
 
-    @Test
-    public void testExceptionsAreNotCached() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testExceptionsAreNotCached(boolean connectionReuseEnabled) throws Exception {
         NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient();
 
         try {
@@ -116,13 +110,10 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                             new UnstableNettyClient(nettyServerAndClient.client(), 1),
                             connectionReuseEnabled);
 
-            final ConnectionID connectionID = nettyServerAndClient.getConnectionID(0);
-            try {
-                factory.createPartitionRequestClient(connectionID);
-                fail("Expected the first request to fail.");
-            } catch (RemoteTransportException expected) {
-                // expected
-            }
+            final ConnectionID connectionID = nettyServerAndClient.getConnectionID(RESOURCE_ID, 0);
+            assertThatThrownBy(() -> factory.createPartitionRequestClient(connectionID))
+                    .withFailMessage("Expected the first request to fail.")
+                    .isInstanceOf(RemoteTransportException.class);
 
             factory.createPartitionRequestClient(connectionID);
         } finally {
@@ -131,14 +122,15 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
         }
     }
 
-    @Test
-    public void testReuseNettyPartitionRequestClient() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testReuseNettyPartitionRequestClient(boolean connectionReuseEnabled) throws Exception {
         NettyTestUtil.NettyServerAndClient nettyServerAndClient = createNettyServerAndClient();
         try {
-            checkReuseNettyPartitionRequestClient(nettyServerAndClient, 1);
-            checkReuseNettyPartitionRequestClient(nettyServerAndClient, 2);
-            checkReuseNettyPartitionRequestClient(nettyServerAndClient, 5);
-            checkReuseNettyPartitionRequestClient(nettyServerAndClient, 10);
+            checkReuseNettyPartitionRequestClient(connectionReuseEnabled, nettyServerAndClient, 1);
+            checkReuseNettyPartitionRequestClient(connectionReuseEnabled, nettyServerAndClient, 2);
+            checkReuseNettyPartitionRequestClient(connectionReuseEnabled, nettyServerAndClient, 5);
+            checkReuseNettyPartitionRequestClient(connectionReuseEnabled, nettyServerAndClient, 10);
         } finally {
             nettyServerAndClient.client().shutdown();
             nettyServerAndClient.server().shutdown();
@@ -146,7 +138,9 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
     }
 
     private void checkReuseNettyPartitionRequestClient(
-            NettyTestUtil.NettyServerAndClient nettyServerAndClient, int maxNumberOfConnections)
+            boolean connectionReuseEnabled,
+            NettyTestUtil.NettyServerAndClient nettyServerAndClient,
+            int maxNumberOfConnections)
             throws Exception {
         final Set<NettyPartitionRequestClient> set = new HashSet<>();
 
@@ -158,18 +152,21 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                         connectionReuseEnabled);
         for (int i = 0; i < Math.max(100, maxNumberOfConnections); i++) {
             final ConnectionID connectionID =
-                    nettyServerAndClient.getConnectionID((int) (Math.random() * Integer.MAX_VALUE));
+                    nettyServerAndClient.getConnectionID(
+                            RESOURCE_ID, (int) (Math.random() * Integer.MAX_VALUE));
             set.add(factory.createPartitionRequestClient(connectionID));
         }
-        assertTrue(set.size() <= maxNumberOfConnections);
+        assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections);
     }
 
     /**
      * Verify that the netty client reuse when the netty server closes the channel and there is no
      * input channel.
      */
-    @Test
-    public void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testConnectionReuseWhenRemoteCloseAndNoInputChannel(boolean connectionReuseEnabled)
+            throws Exception {
         CompletableFuture<Void> inactiveFuture = new CompletableFuture<>();
         CompletableFuture<Channel> serverChannelFuture = new CompletableFuture<>();
         NettyProtocol protocol =
@@ -214,8 +211,9 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
         shutdown(serverAndClient);
     }
 
-    @Test
-    public void testNettyClientConnectRetry() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testNettyClientConnectRetry(boolean connectionReuseEnabled) throws Exception {
         NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient();
         UnstableNettyClient unstableNettyClient =
                 new UnstableNettyClient(serverAndClient.client(), 2);
@@ -224,30 +222,42 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                 new PartitionRequestClientFactory(
                         unstableNettyClient, 2, 1, connectionReuseEnabled);
 
-        factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
+        factory.createPartitionRequestClient(serverAndClient.getConnectionID(RESOURCE_ID, 0));
 
         serverAndClient.client().shutdown();
         serverAndClient.server().shutdown();
     }
 
     // see https://issues.apache.org/jira/browse/FLINK-18821
-    @Test(expected = IOException.class)
-    public void testFailureReportedToSubsequentRequests() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testFailureReportedToSubsequentRequests(boolean connectionReuseEnabled) throws Exception {
         PartitionRequestClientFactory factory =
                 new PartitionRequestClientFactory(
                         new FailingNettyClient(), 2, 1, connectionReuseEnabled);
-        try {
-            factory.createPartitionRequestClient(
-                    new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
-        } catch (Exception e) {
-            // expected
-        }
-        factory.createPartitionRequestClient(
-                new ConnectionID(new InetSocketAddress(InetAddress.getLocalHost(), 8080), 0));
+
+        assertThatThrownBy(
+                () ->
+                        factory.createPartitionRequestClient(
+                                new ConnectionID(
+                                        ResourceID.generate(),
+                                        new InetSocketAddress(InetAddress.getLocalHost(), 8080),
+                                        0)));
+
+        assertThatThrownBy(
+                        () ->
+                                factory.createPartitionRequestClient(
+                                        new ConnectionID(
+                                                ResourceID.generate(),
+                                                new InetSocketAddress(
+                                                        InetAddress.getLocalHost(), 8080),
+                                                0)))
+                .isInstanceOf(IOException.class);
     }
 
-    @Test(expected = IOException.class)
-    public void testNettyClientConnectRetryFailure() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testNettyClientConnectRetryFailure(boolean connectionReuseEnabled) throws Exception {
         NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient();
         UnstableNettyClient unstableNettyClient =
                 new UnstableNettyClient(serverAndClient.client(), 3);
@@ -257,16 +267,22 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                     new PartitionRequestClientFactory(
                             unstableNettyClient, 2, 1, connectionReuseEnabled);
 
-            factory.createPartitionRequestClient(serverAndClient.getConnectionID(0));
-
+            assertThatThrownBy(
+                            () -> {
+                                factory.createPartitionRequestClient(
+                                        serverAndClient.getConnectionID(RESOURCE_ID, 0));
+                            })
+                    .isInstanceOf(IOException.class);
         } finally {
             serverAndClient.client().shutdown();
             serverAndClient.server().shutdown();
         }
     }
 
-    @Test
-    public void testNettyClientConnectRetryMultipleThread() throws Exception {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testNettyClientConnectRetryMultipleThread(boolean connectionReuseEnabled)
+            throws Exception {
         NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient();
         UnstableNettyClient unstableNettyClient =
                 new UnstableNettyClient(serverAndClient.client(), 2);
@@ -286,7 +302,8 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                                 try {
                                     client =
                                             factory.createPartitionRequestClient(
-                                                    serverAndClient.getConnectionID(0));
+                                                    serverAndClient.getConnectionID(
+                                                            RESOURCE_ID, 0));
                                 } catch (Exception e) {
                                     fail(e.getMessage());
                                 }
@@ -301,10 +318,10 @@ public class PartitionRequestClientFactoryTest extends TestLogger {
                     NettyPartitionRequestClient client;
                     try {
                         client = runnableFuture.get();
-                        assertNotNull(client);
+                        assertThat(client).isNotNull();
                     } catch (Exception e) {
                         System.out.println(e.getMessage());
-                        fail();
+                        fail(e.getMessage());
                     }
                 });
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
index f98dd4e3ce9..aa8f4b77bda 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.util;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;


[flink] 02/02: [FLINK-29639][runtime] Print resourceId of remote taskmanager when encounter transport exception.

Posted by xt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7a4fb7c86fbb28bee63bba829ebdda7b886170e3
Author: Weijie Guo <re...@163.com>
AuthorDate: Wed Nov 16 21:24:23 2022 +0800

    [FLINK-29639][runtime] Print resourceId of remote taskmanager when encounter transport exception.
    
    This closes #21358
    
    (cherry picked from commit 93c834be953f1336adb3ec5b5bf759a20e25eddf)
---
 .../flink/runtime/io/network/ConnectionID.java     | 22 +++++++++----
 .../runtime/io/network/NetworkClientHandler.java   |  2 ++
 .../CreditBasedPartitionRequestClientHandler.java  | 37 +++++++++++++++++++--
 .../network/netty/NettyPartitionRequestClient.java | 16 +++++++--
 .../netty/PartitionRequestClientFactory.java       |  4 +++
 .../runtime/shuffle/NettyShuffleDescriptor.java    | 38 +++++++++++++++-------
 .../ResultPartitionDeploymentDescriptorTest.java   |  5 +--
 .../runtime/deployment/ShuffleDescriptorTest.java  | 13 ++++++--
 .../netty/ClientTransportErrorHandlingTest.java    | 13 +++++---
 ...editBasedPartitionRequestClientHandlerTest.java |  4 +++
 .../netty/NettyPartitionRequestClientTest.java     |  4 ++-
 .../runtime/io/network/netty/NettyTestUtil.java    |  4 ++-
 .../netty/PartitionRequestClientFactoryTest.java   |  2 +-
 .../partition/consumer/InputChannelBuilder.java    |  3 +-
 .../util/NettyShuffleDescriptorBuilder.java        |  7 ++--
 15 files changed, 134 insertions(+), 40 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
index 6cb0fa29f20..fb60340c673 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionID.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.IntermediateResult;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
@@ -43,18 +44,26 @@ public class ConnectionID implements Serializable {
 
     private final int connectionIndex;
 
+    private final ResourceID resourceID;
+
     public ConnectionID(TaskManagerLocation connectionInfo, int connectionIndex) {
         this(
+                connectionInfo.getResourceID(),
                 new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()),
                 connectionIndex);
     }
 
-    public ConnectionID(InetSocketAddress address, int connectionIndex) {
+    public ConnectionID(ResourceID resourceID, InetSocketAddress address, int connectionIndex) {
+        this.resourceID = checkNotNull(resourceID);
         this.address = checkNotNull(address);
         checkArgument(connectionIndex >= 0);
         this.connectionIndex = connectionIndex;
     }
 
+    public ResourceID getResourceID() {
+        return resourceID;
+    }
+
     public InetSocketAddress getAddress() {
         return address;
     }
@@ -75,15 +84,14 @@ public class ConnectionID implements Serializable {
         }
 
         final ConnectionID ra = (ConnectionID) other;
-        if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) {
-            return false;
-        }
-
-        return true;
+        return ra.getAddress().equals(address)
+                && ra.getConnectionIndex() == connectionIndex
+                && ra.getResourceID().equals(resourceID);
     }
 
     @Override
     public String toString() {
-        return address + " [" + connectionIndex + "]";
+        return String.format(
+                "%s (%s) [%s]", address, resourceID.getStringWithMetadata(), connectionIndex);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
index 18ae9e6d9a9..354da6c3a32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkClientHandler.java
@@ -39,6 +39,8 @@ public interface NetworkClientHandler extends ChannelHandler {
 
     void cancelRequestFor(InputChannelID inputChannelId);
 
+    void setConnectionId(ConnectionID connectionId);
+
     /**
      * Return whether there is channel error.
      *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index fe008c1afed..63efdf117c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -43,6 +44,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Channel handler to read the messages of buffer response or error response from the producer, to
  * write and flush the unannounced credits for the producer.
@@ -74,6 +77,8 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
      */
     private volatile ChannelHandlerContext ctx;
 
+    private ConnectionID connectionID;
+
     // ------------------------------------------------------------------------
     // Input channel/receiver registration
     // ------------------------------------------------------------------------
@@ -125,6 +130,9 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
                 new RemoteTransportException(
                         "Connection unexpectedly closed by remote task manager '"
                                 + remoteAddr
+                                + " [ "
+                                + connectionID.getResourceID().getStringWithMetadata()
+                                + " ] "
                                 + "'. "
                                 + "This might indicate that the remote task manager was lost.",
                         remoteAddr));
@@ -153,6 +161,9 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
                         new RemoteTransportException(
                                 "Lost connection to task manager '"
                                         + remoteAddr
+                                        + " [ "
+                                        + connectionID.getResourceID().getStringWithMetadata()
+                                        + " ] "
                                         + "'. "
                                         + "This indicates that the remote task manager was lost.",
                                 remoteAddr,
@@ -162,7 +173,10 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
                 tex =
                         new LocalTransportException(
                                 String.format(
-                                        "%s (connection to '%s')", cause.getMessage(), remoteAddr),
+                                        "%s (connection to '%s [%s]')",
+                                        cause.getMessage(),
+                                        remoteAddr,
+                                        connectionID.getResourceID().getStringWithMetadata()),
                                 localAddr,
                                 cause);
             }
@@ -208,6 +222,11 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
         return channelError.get() != null;
     }
 
+    @Override
+    public void setConnectionId(ConnectionID connectionId) {
+        this.connectionID = checkNotNull(connectionId);
+    }
+
     @Override
     public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
         writeAndFlushNextMessageIfPossible(ctx.channel());
@@ -283,7 +302,12 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
             if (error.isFatalError()) {
                 notifyAllChannelsOfErrorAndClose(
                         new RemoteTransportException(
-                                "Fatal error at remote task manager '" + remoteAddr + "'.",
+                                "Fatal error at remote task manager '"
+                                        + remoteAddr
+                                        + " [ "
+                                        + connectionID.getResourceID().getStringWithMetadata()
+                                        + " ] "
+                                        + "'.",
                                 remoteAddr,
                                 error.cause));
             } else {
@@ -295,7 +319,14 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap
                     } else {
                         inputChannel.onError(
                                 new RemoteTransportException(
-                                        "Error at remote task manager '" + remoteAddr + "'.",
+                                        "Error at remote task manager '"
+                                                + remoteAddr
+                                                + " [ "
+                                                + connectionID
+                                                        .getResourceID()
+                                                        .getStringWithMetadata()
+                                                + " ] "
+                                                + "'.",
                                         remoteAddr,
                                         error.cause));
                     }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
index 52e1b68d39d..de2fc747ecb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClient.java
@@ -77,6 +77,7 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
         this.clientHandler = checkNotNull(clientHandler);
         this.connectionId = checkNotNull(connectionId);
         this.clientFactory = checkNotNull(clientFactory);
+        clientHandler.setConnectionId(connectionId);
     }
 
     boolean canBeDisposed() {
@@ -138,8 +139,11 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
                             inputChannel.onError(
                                     new LocalTransportException(
                                             String.format(
-                                                    "Sending the partition request to '%s (#%d)' failed.",
+                                                    "Sending the partition request to '%s [%s] (#%d)' failed.",
                                                     connectionId.getAddress(),
+                                                    connectionId
+                                                            .getResourceID()
+                                                            .getStringWithMetadata(),
                                                     connectionId.getConnectionIndex()),
                                             future.channel().localAddress(),
                                             future.cause()));
@@ -197,8 +201,11 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
                                     inputChannel.onError(
                                             new LocalTransportException(
                                                     String.format(
-                                                            "Sending the task event to '%s (#%d)' failed.",
+                                                            "Sending the task event to '%s [%s] (#%d)' failed.",
                                                             connectionId.getAddress(),
+                                                            connectionId
+                                                                    .getResourceID()
+                                                                    .getStringWithMetadata(),
                                                             connectionId.getConnectionIndex()),
                                                     future.channel().localAddress(),
                                                     future.cause()));
@@ -275,7 +282,10 @@ public class NettyPartitionRequestClient implements PartitionRequestClient {
             final SocketAddress localAddr = tcpChannel.localAddress();
             final SocketAddress remoteAddr = tcpChannel.remoteAddress();
             throw new LocalTransportException(
-                    String.format("Channel to '%s' closed.", remoteAddr), localAddr);
+                    String.format(
+                            "Channel to '%s [%s]' closed.",
+                            remoteAddr, connectionId.getResourceID().getStringWithMetadata()),
+                    localAddr);
         }
     }
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index 7463cad9e70..ca92f052ffe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -80,6 +80,7 @@ class PartitionRequestClientFactory {
         // We map the input ConnectionID to a new value to restrict the number of tcp connections
         connectionId =
                 new ConnectionID(
+                        connectionId.getResourceID(),
                         connectionId.getAddress(),
                         connectionId.getConnectionIndex() % maxNumberOfConnections);
         while (true) {
@@ -164,6 +165,9 @@ class PartitionRequestClientFactory {
             throw new RemoteTransportException(
                     "Connecting to remote task manager '"
                             + connectionId.getAddress()
+                            + " [ "
+                            + connectionId.getResourceID().getStringWithMetadata()
+                            + " ] "
                             + "' has failed. This might indicate that the remote task "
                             + "manager has been lost.",
                     connectionId.getAddress(),
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
index 9831e949d30..e73e305497f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
@@ -48,7 +48,10 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
     }
 
     public ConnectionID getConnectionId() {
-        return partitionConnectionInfo.getConnectionId();
+        return new ConnectionID(
+                producerLocation,
+                partitionConnectionInfo.getAddress(),
+                partitionConnectionInfo.getConnectionIndex());
     }
 
     @Override
@@ -66,9 +69,10 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
     }
 
     /** Information for connection to partition producer for shuffle exchange. */
-    @FunctionalInterface
     public interface PartitionConnectionInfo extends Serializable {
-        ConnectionID getConnectionId();
+        InetSocketAddress getAddress();
+
+        int getConnectionIndex();
     }
 
     /**
@@ -81,16 +85,22 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
 
         private static final long serialVersionUID = 5992534320110743746L;
 
-        private final ConnectionID connectionID;
+        private final InetSocketAddress address;
+
+        private final int connectionIndex;
 
         @VisibleForTesting
-        public NetworkPartitionConnectionInfo(ConnectionID connectionID) {
-            this.connectionID = connectionID;
+        public NetworkPartitionConnectionInfo(InetSocketAddress address, int connectionIndex) {
+            this.address = address;
+            this.connectionIndex = connectionIndex;
         }
 
-        @Override
-        public ConnectionID getConnectionId() {
-            return connectionID;
+        public InetSocketAddress getAddress() {
+            return address;
+        }
+
+        public int getConnectionIndex() {
+            return connectionIndex;
         }
 
         static NetworkPartitionConnectionInfo fromProducerDescriptor(
@@ -98,7 +108,7 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
             InetSocketAddress address =
                     new InetSocketAddress(
                             producerDescriptor.getAddress(), producerDescriptor.getDataPort());
-            return new NetworkPartitionConnectionInfo(new ConnectionID(address, connectionIndex));
+            return new NetworkPartitionConnectionInfo(address, connectionIndex);
         }
     }
 
@@ -111,7 +121,13 @@ public class NettyShuffleDescriptor implements ShuffleDescriptor {
         INSTANCE;
 
         @Override
-        public ConnectionID getConnectionId() {
+        public InetSocketAddress getAddress() {
+            throw new UnsupportedOperationException(
+                    "Local execution does not support shuffle connection.");
+        }
+
+        @Override
+        public int getConnectionIndex() {
             throw new UnsupportedOperationException(
                     "Local execution does not support shuffle connection.");
         }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
index 423512f7862..11450b717f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ResultPartitionDeploymentDescriptorTest.java
@@ -66,7 +66,8 @@ class ResultPartitionDeploymentDescriptorTest {
 
     private static final ResourceID producerLocation = new ResourceID("producerLocation");
     private static final InetSocketAddress address = new InetSocketAddress("localhost", 10000);
-    private static final ConnectionID connectionID = new ConnectionID(address, connectionIndex);
+    private static final ConnectionID connectionID =
+            new ConnectionID(producerLocation, address, connectionIndex);
 
     /** Tests simple de/serialization with {@link UnknownShuffleDescriptor}. */
     @Test
@@ -85,7 +86,7 @@ class ResultPartitionDeploymentDescriptorTest {
         ShuffleDescriptor shuffleDescriptor =
                 new NettyShuffleDescriptor(
                         producerLocation,
-                        new NetworkPartitionConnectionInfo(connectionID),
+                        new NetworkPartitionConnectionInfo(address, connectionIndex),
                         resultPartitionID);
 
         ResultPartitionDeploymentDescriptor copy =
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
index 89cc242dd3c..7bc645e054f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/ShuffleDescriptorTest.java
@@ -66,9 +66,10 @@ public class ShuffleDescriptorTest extends TestLogger {
                             jobID, localPartitionId, consumerResourceID);
 
             ResultPartitionID remotePartitionId = new ResultPartitionID();
+            ResourceID remoteResourceID = ResourceID.generate();
             ResultPartitionDeploymentDescriptor remotePartition =
                     createResultPartitionDeploymentDescriptor(
-                            jobID, remotePartitionId, ResourceID.generate());
+                            jobID, remotePartitionId, remoteResourceID);
 
             ResultPartitionID unknownPartitionId = new ResultPartitionID();
 
@@ -118,7 +119,15 @@ public class ShuffleDescriptorTest extends TestLogger {
                         remotePartitionId);
                 nettyShuffleDescriptor = (NettyShuffleDescriptor) remoteShuffleDescriptor;
                 assertThat(nettyShuffleDescriptor.isLocalTo(consumerResourceID), is(false));
-                assertThat(nettyShuffleDescriptor.getConnectionId(), is(STUB_CONNECTION_ID));
+                assertThat(
+                        nettyShuffleDescriptor.getConnectionId().getAddress(),
+                        is(STUB_CONNECTION_ID.getAddress()));
+                assertThat(
+                        nettyShuffleDescriptor.getConnectionId().getConnectionIndex(),
+                        is(STUB_CONNECTION_ID.getConnectionIndex()));
+                assertThat(
+                        nettyShuffleDescriptor.getConnectionId().getResourceID(),
+                        is(remoteResourceID));
             } else {
                 // Unknown (lazy deployment allowed)
                 verifyShuffleDescriptor(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index 5830620d893..fae2163c97e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
@@ -45,6 +46,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -65,6 +67,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 class ClientTransportErrorHandlingTest {
+    private static final ConnectionID CONNECTION_ID =
+            new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0);
 
     /**
      * Verifies that failed client requests via {@link PartitionRequestClient} are correctly
@@ -113,10 +117,7 @@ class ClientTransportErrorHandlingTest {
 
         PartitionRequestClient requestClient =
                 new NettyPartitionRequestClient(
-                        ch,
-                        handler,
-                        mock(ConnectionID.class),
-                        mock(PartitionRequestClientFactory.class));
+                        ch, handler, CONNECTION_ID, mock(PartitionRequestClientFactory.class));
 
         // Create input channels
         RemoteInputChannel[] rich =
@@ -396,7 +397,9 @@ class ClientTransportErrorHandlingTest {
     }
 
     private NetworkClientHandler getClientHandler(Channel ch) {
-        return ch.pipeline().get(NetworkClientHandler.class);
+        NetworkClientHandler networkClientHandler = ch.pipeline().get(NetworkClientHandler.class);
+        networkClientHandler.setConnectionId(CONNECTION_ID);
+        return networkClientHandler;
     }
 
     private RemoteInputChannel createRemoteInputChannel() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index f3074f9682b..9cab5927483 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
@@ -60,6 +61,7 @@ import org.junit.Assume;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 
 import static org.apache.flink.runtime.io.network.netty.PartitionRequestQueueTest.blockChannel;
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createRemoteInputChannel;
@@ -652,6 +654,8 @@ public class CreditBasedPartitionRequestClientHandlerTest {
             Class<? extends TransportException> expectedClass, Exception cause) {
         CreditBasedPartitionRequestClientHandler handler =
                 new CreditBasedPartitionRequestClientHandler();
+        handler.setConnectionId(
+                new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 0), 0));
         EmbeddedChannel embeddedChannel =
                 new EmbeddedChannel(
                         // A test handler to trigger the exception.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
index f2b66c42490..6ede9dbdc90 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyPartitionRequestClientTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
@@ -287,7 +288,8 @@ public class NettyPartitionRequestClientTest {
         try (NetUtils.Port availablePort = NetUtils.getAvailablePort()) {
             int port = availablePort.getPort();
             ConnectionID connectionID =
-                    new ConnectionID(new InetSocketAddress("localhost", port), 0);
+                    new ConnectionID(
+                            ResourceID.generate(), new InetSocketAddress("localhost", port), 0);
             NettyConfig config =
                     new NettyConfig(InetAddress.getLocalHost(), port, 1024, 1, new Configuration());
             NettyClient nettyClient = new NettyClient(config);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
index 0ec9b5380da..5e4b5b68711 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.util.NetUtils;
 
@@ -224,8 +225,9 @@ public class NettyTestUtil {
             return client;
         }
 
-        ConnectionID getConnectionID(int connectionIndex) {
+        ConnectionID getConnectionID(ResourceID resourceID, int connectionIndex) {
             return new ConnectionID(
+                    resourceID,
                     new InetSocketAddress(
                             server.getConfig().getServerAddress(),
                             server.getConfig().getServerPort()),
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 7ebc2aed829..51f2a934275 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -199,7 +199,7 @@ public class PartitionRequestClientFactoryTest {
                 new PartitionRequestClientFactory(
                         serverAndClient.client(), 2, 1, connectionReuseEnabled);
 
-        ConnectionID connectionID = serverAndClient.getConnectionID(0);
+        ConnectionID connectionID = serverAndClient.getConnectionID(ResourceID.generate(), 0);
         NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID);
 
         // close server channel
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
index bb5d24a736b..622a1392a03 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
@@ -38,7 +39,7 @@ import static org.apache.flink.runtime.io.network.partition.consumer.SingleInput
 /** Builder for various {@link InputChannel} types. */
 public class InputChannelBuilder {
     public static final ConnectionID STUB_CONNECTION_ID =
-            new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
+            new ConnectionID(ResourceID.generate(), new InetSocketAddress("localhost", 5000), 0);
 
     private int channelIndex = 0;
     private ResultPartitionID partitionId = new ResultPartitionID();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
index aa8f4b77bda..f45d4842595 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
@@ -71,10 +71,11 @@ public class NettyShuffleDescriptorBuilder {
     }
 
     public NettyShuffleDescriptor buildRemote() {
-        ConnectionID connectionID =
-                new ConnectionID(new InetSocketAddress(address, dataPort), connectionIndex);
         return new NettyShuffleDescriptor(
-                producerLocation, new NetworkPartitionConnectionInfo(connectionID), id);
+                producerLocation,
+                new NetworkPartitionConnectionInfo(
+                        new InetSocketAddress(address, dataPort), connectionIndex),
+                id);
     }
 
     public NettyShuffleDescriptor buildLocal() {