You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/01/04 15:23:56 UTC

[GitHub] [flink] rkhachatryan commented on a change in pull request #14528: [FLINK-20615] Clean PartitionRequestClientFactory up if createPartitionRequestClient fails

rkhachatryan commented on a change in pull request #14528:
URL: https://github.com/apache/flink/pull/14528#discussion_r551382429



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
##########
@@ -69,36 +68,34 @@
     NettyPartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
             throws IOException, InterruptedException {
         while (true) {
-            AtomicBoolean isTheFirstOne = new AtomicBoolean(false);
-            CompletableFuture<NettyPartitionRequestClient> clientFuture =
-                    clients.computeIfAbsent(
-                            connectionId,
-                            unused -> {
-                                isTheFirstOne.set(true);
-                                return new CompletableFuture<>();
-                            });
-            if (isTheFirstOne.get()) {
+            final CompletableFuture<NettyPartitionRequestClient> newClientFuture =
+                    new CompletableFuture<>();
+
+            final CompletableFuture<NettyPartitionRequestClient> clientFuture =
+                    clients.putIfAbsent(connectionId, newClientFuture);
+
+            final NettyPartitionRequestClient client;
+
+            if (clientFuture == null) {
                 try {
-                    clientFuture.complete(connectWithRetries(connectionId));
-                } catch (InterruptedException e) {
-                    clientFuture.complete(null); // let others waiting know that they should retry
+                    client = connectWithRetries(connectionId);
+                } catch (RemoteTransportException | InterruptedException e) {

Review comment:
       Thanks!
   
   I just spotted another related issue: this exception is re-thrown below without being wrapped (at line 86).
   
   Previously, it was fine because it was only `InterruptedException` (other exceptions were wrapped after `future.get`).
   Not wrapping it into `IOException` can break the logic in `RestartPipelinedRegionFailoverStrategy.getTasksNeedingRestart`.
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org