You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2023/01/27 09:15:01 UTC

[flink] branch master updated (f6c7c30118e -> 952c7831e74)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from f6c7c30118e [FLINK-30709][runtime] NetworkInput#emitNext() should push records to DataOutput within a while loop
     new 2b5fe306114 [FLINK-26082][runtime] Initializing test netty server and client in the loop to avoid the probability of `Address already in use` problem.
     new 399bcef57db [FLINK-26082][runtime] Refactor ServerTransportErrorHandlingTest and ClientTransportErrorHandlingTest according to `initServerAndClient` method changes.
     new 952c7831e74 [hotfix][runtime] Check style ServerTransportErrorHandlingTest and ClientTransportErrorHandlingTest fix

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../netty/ClientTransportErrorHandlingTest.java    | 84 ++++++++++------------
 .../runtime/io/network/netty/NettyTestUtil.java    | 20 +++++-
 .../netty/ServerTransportErrorHandlingTest.java    | 56 ++++-----------
 3 files changed, 69 insertions(+), 91 deletions(-)


[flink] 02/03: [FLINK-26082][runtime] Refactor ServerTransportErrorHandlingTest and ClientTransportErrorHandlingTest according to `initServerAndClient` method changes.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 399bcef57db394d57e188cc646c95f2b1d3a4890
Author: Anton Kalashnikov <ak...@gmail.com>
AuthorDate: Wed Jan 25 17:19:09 2023 +0100

    [FLINK-26082][runtime] Refactor ServerTransportErrorHandlingTest and ClientTransportErrorHandlingTest according to `initServerAndClient` method changes.
---
 .../netty/ClientTransportErrorHandlingTest.java    |  5 ++---
 .../netty/ServerTransportErrorHandlingTest.java    | 26 +---------------------
 2 files changed, 3 insertions(+), 28 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index fae2163c97e..c8ae23cac08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -51,7 +51,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
-import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -89,7 +88,7 @@ class ClientTransportErrorHandlingTest {
 
         // We need a real server and client in this test, because Netty's EmbeddedChannel is
         // not failing the ChannelPromise of failed writes.
-        NettyServerAndClient serverAndClient = initServerAndClient(protocol, createConfig());
+        NettyServerAndClient serverAndClient = initServerAndClient(protocol);
 
         Channel ch = connect(serverAndClient);
 
@@ -238,7 +237,7 @@ class ClientTransportErrorHandlingTest {
                     }
                 };
 
-        NettyServerAndClient serverAndClient = initServerAndClient(protocol, createConfig());
+        NettyServerAndClient serverAndClient = initServerAndClient(protocol);
 
         Channel ch = connect(serverAndClient);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index b922eee236d..55985168ac7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
@@ -37,12 +36,10 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import java.net.BindException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
-import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.createConfig;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
 import static org.junit.Assert.fail;
@@ -52,7 +49,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class ServerTransportErrorHandlingTest {
-    private static final int NETTY_INIT_MAX_RETRY_TIMES = 20;
 
     /** Verifies remote closes trigger the release of all resources. */
     @Test
@@ -104,27 +100,7 @@ public class ServerTransportErrorHandlingTest {
         NettyTestUtil.NettyServerAndClient serverAndClient = null;
 
         try {
-            for (int retry = 0; retry < NETTY_INIT_MAX_RETRY_TIMES; retry++) {
-                try {
-                    serverAndClient = initServerAndClient(protocol, createConfig());
-                    break;
-                } catch (Exception e) {
-                    if (retry >= NETTY_INIT_MAX_RETRY_TIMES - 1) {
-                        throw new RuntimeException(
-                                "Failed to initialize netty server and client, retried "
-                                        + retry
-                                        + " times.",
-                                e);
-                    }
-                    if (e instanceof BindException
-                            || ExceptionUtils.findThrowableWithMessage(e, "Address already in use")
-                                    .isPresent()) {
-                        continue;
-                    }
-                    throw e;
-                }
-            }
-
+            serverAndClient = initServerAndClient(protocol);
             Channel ch = connect(serverAndClient);
 
             // Write something to trigger close by server


[flink] 01/03: [FLINK-26082][runtime] Initializing test netty server and client in the loop to avoid the probability of `Address already in use` problem.

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b5fe306114cbe116f2576392374f4bd84c89e41
Author: Anton Kalashnikov <ak...@gmail.com>
AuthorDate: Wed Jan 25 17:17:25 2023 +0100

    [FLINK-26082][runtime] Initializing test netty server and client in the loop to avoid the probability of `Address already in use` problem.
---
 .../runtime/io/network/netty/NettyTestUtil.java      | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
index 5e4b5b68711..c5bdcb85b86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java
@@ -27,6 +27,7 @@ import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
+import java.net.BindException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.concurrent.TimeUnit;
@@ -35,6 +36,7 @@ import java.util.function.Function;
 import static junit.framework.TestCase.assertEquals;
 import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
 import static org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
+import static org.apache.flink.util.ExceptionUtils.findThrowableWithMessage;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertTrue;
@@ -96,7 +98,23 @@ public class NettyTestUtil {
     }
 
     static NettyServerAndClient initServerAndClient(NettyProtocol protocol) throws Exception {
-        return initServerAndClient(protocol, createConfig());
+        // It is possible that between checking a port available and binding to the port something
+        // takes this port. So we initialize a server in the loop to decrease the probability of it.
+        int attempts = 42; // The arbitrary number of attempts to avoid an infinity loop.
+        while (true) {
+            try {
+                return initServerAndClient(protocol, createConfig());
+            } catch (Exception ex) {
+                if (!(ex instanceof BindException)
+                        && !findThrowableWithMessage(ex, "Address already in use").isPresent()) {
+                    throw ex;
+                }
+
+                if (attempts-- < 0) {
+                    throw new Exception("Failed to initialize netty server and client", ex);
+                }
+            }
+        }
     }
 
     static NettyServerAndClient initServerAndClient(NettyProtocol protocol, NettyConfig config)


[flink] 03/03: [hotfix][runtime] Check style ServerTransportErrorHandlingTest and ClientTransportErrorHandlingTest fix

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 952c7831e7411201edea7fdf205b481c0ae26a8e
Author: Anton Kalashnikov <ak...@gmail.com>
AuthorDate: Thu Jan 26 17:56:30 2023 +0100

    [hotfix][runtime] Check style ServerTransportErrorHandlingTest and ClientTransportErrorHandlingTest fix
---
 .../netty/ClientTransportErrorHandlingTest.java    | 79 ++++++++++------------
 .../netty/ServerTransportErrorHandlingTest.java    | 30 ++++----
 2 files changed, 47 insertions(+), 62 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
index c8ae23cac08..47d312f5ffa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ClientTransportErrorHandlingTest.java
@@ -42,7 +42,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
 import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
 
 import org.junit.jupiter.api.Test;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
@@ -55,8 +54,8 @@ import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServer
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatNoException;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.isA;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
@@ -102,9 +101,7 @@ class ClientTransportErrorHandlingTest {
 
                             @Override
                             public void write(
-                                    ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
-                                    throws Exception {
-
+                                    ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                                 if (writeNum >= 1) {
                                     throw new RuntimeException("Expected test exception.");
                                 }
@@ -124,17 +121,15 @@ class ClientTransportErrorHandlingTest {
 
         final CountDownLatch sync = new CountDownLatch(1);
 
-        // Do this with explicit synchronization. Otherwise this is not robust against slow timings
+        // Do this with explicit synchronization. Otherwise, this is not robust against slow timings
         // of the callback (e.g. we cannot just verify that it was called once, because there is
         // a chance that we do this too early).
         doAnswer(
-                        new Answer<Void>() {
-                            @Override
-                            public Void answer(InvocationOnMock invocation) throws Throwable {
-                                sync.countDown();
-                                return null;
-                            }
-                        })
+                        (Answer<Void>)
+                                invocation -> {
+                                    sync.countDown();
+                                    return null;
+                                })
                 .when(rich[1])
                 .onError(isA(LocalTransportException.class));
 
@@ -143,7 +138,7 @@ class ClientTransportErrorHandlingTest {
 
         // Second request is *not* successful
         requestClient.requestSubpartition(new ResultPartitionID(), 0, rich[1], 0);
-        // Wait for the notification and it could confirm all the request operations are done
+        // Wait for the notification, and it could confirm all the request operations are done
         assertThat(sync.await(TestingUtils.TESTING_DURATION.toMillis(), TimeUnit.MILLISECONDS))
                 .withFailMessage(
                         "Timed out after waiting for "
@@ -227,9 +222,7 @@ class ClientTransportErrorHandlingTest {
                             // Close on read
                             new ChannelInboundHandlerAdapter() {
                                 @Override
-                                public void channelRead(ChannelHandlerContext ctx, Object msg)
-                                        throws Exception {
-
+                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                     ctx.channel().close();
                                 }
                             }
@@ -250,12 +243,9 @@ class ClientTransportErrorHandlingTest {
         final CountDownLatch sync = new CountDownLatch(rich.length);
 
         Answer<Void> countDownLatch =
-                new Answer<Void>() {
-                    @Override
-                    public Void answer(InvocationOnMock invocation) throws Throwable {
-                        sync.countDown();
-                        return null;
-                    }
+                invocation -> {
+                    sync.countDown();
+                    return null;
                 };
 
         for (RemoteInputChannel r : rich) {
@@ -329,26 +319,27 @@ class ClientTransportErrorHandlingTest {
 
         // Verify the Exception
         doAnswer(
-                        new Answer<Void>() {
-                            @Override
-                            public Void answer(InvocationOnMock invocation) throws Throwable {
-                                Throwable cause = (Throwable) invocation.getArguments()[0];
-
-                                try {
-                                    assertThat(cause).isInstanceOf(RemoteTransportException.class);
-                                    assertThat(cause)
-                                            .hasMessageNotContaining("Connection reset by peer");
-
-                                    assertThat(cause.getCause()).isInstanceOf(IOException.class);
-                                    assertThat(cause.getCause())
-                                            .hasMessage("Connection reset by peer");
-                                } catch (Throwable t) {
-                                    error[0] = t;
-                                }
-
-                                return null;
-                            }
-                        })
+                        (Answer<Void>)
+                                invocation -> {
+                                    Throwable cause = (Throwable) invocation.getArguments()[0];
+
+                                    try {
+                                        assertThat(cause)
+                                                .isInstanceOf(RemoteTransportException.class);
+                                        assertThat(cause)
+                                                .hasMessageNotContaining(
+                                                        "Connection reset by peer");
+
+                                        assertThat(cause.getCause())
+                                                .isInstanceOf(IOException.class);
+                                        assertThat(cause.getCause())
+                                                .hasMessage("Connection reset by peer");
+                                    } catch (Throwable t) {
+                                        error[0] = t;
+                                    }
+
+                                    return null;
+                                })
                 .when(rich)
                 .onError(any(Throwable.class));
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
index 55985168ac7..c8faf422fc0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/ServerTransportErrorHandlingTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.netty.CancelPartitionRequestTest.InfiniteSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -33,7 +34,6 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
 
 import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.concurrent.CountDownLatch;
@@ -43,8 +43,8 @@ import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.connect;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient;
 import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown;
 import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -64,18 +64,14 @@ public class ServerTransportErrorHandlingTest {
                         anyInt(),
                         any(BufferAvailabilityListener.class)))
                 .thenAnswer(
-                        new Answer<ResultSubpartitionView>() {
-                            @Override
-                            public ResultSubpartitionView answer(InvocationOnMock invocationOnMock)
-                                    throws Throwable {
-                                BufferAvailabilityListener listener =
-                                        (BufferAvailabilityListener)
-                                                invocationOnMock.getArguments()[2];
-                                listener.notifyDataAvailable();
-                                return new CancelPartitionRequestTest.InfiniteSubpartitionView(
-                                        outboundBuffers, sync);
-                            }
-                        });
+                        (Answer<ResultSubpartitionView>)
+                                invocationOnMock -> {
+                                    BufferAvailabilityListener listener =
+                                            (BufferAvailabilityListener)
+                                                    invocationOnMock.getArguments()[2];
+                                    listener.notifyDataAvailable();
+                                    return new InfiniteSubpartitionView(outboundBuffers, sync);
+                                });
 
         NettyProtocol protocol =
                 new NettyProtocol(partitionManager, mock(TaskEventDispatcher.class)) {
@@ -87,9 +83,7 @@ public class ServerTransportErrorHandlingTest {
                             // Close on read
                             new ChannelInboundHandlerAdapter() {
                                 @Override
-                                public void channelRead(ChannelHandlerContext ctx, Object msg)
-                                        throws Exception {
-
+                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                     ctx.channel().close();
                                 }
                             }