You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/12/02 16:02:01 UTC

[flink] branch release-1.14 updated: [FLINK-24919][runtime] Getting vertex only under synchronization

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new d26c0e5  [FLINK-24919][runtime] Getting vertex only under synchronization
d26c0e5 is described below

commit d26c0e511e9f37671b52c23df4c09e7aa3719d5a
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Nov 29 13:23:14 2021 +0100

    [FLINK-24919][runtime] Getting vertex only under synchronization
---
 .../ExecutionAttemptMappingProvider.java           | 39 ++++++++++++----------
 1 file changed, 22 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
index a7f7f83..f74804b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -38,32 +37,38 @@ public class ExecutionAttemptMappingProvider {
     private final List<ExecutionVertex> tasks;
 
     /** The cached mapping, which would only be updated on miss. */
-    private final LinkedHashMap<ExecutionAttemptID, ExecutionVertex> cachedTasksById;
+    private Map<ExecutionAttemptID, ExecutionVertex> cachedTasksById;
 
     public ExecutionAttemptMappingProvider(Iterable<ExecutionVertex> tasksIterable) {
         this.tasks = new ArrayList<>();
         tasksIterable.forEach(this.tasks::add);
 
-        this.cachedTasksById =
-                new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(tasks.size()) {
-
-                    @Override
-                    protected boolean removeEldestEntry(
-                            Map.Entry<ExecutionAttemptID, ExecutionVertex> eldest) {
-                        return size() > tasks.size();
-                    }
-                };
+        this.cachedTasksById = new HashMap<>(tasks.size());
     }
 
     public Optional<ExecutionVertex> getVertex(ExecutionAttemptID id) {
-        if (!cachedTasksById.containsKey(id)) {
-            cachedTasksById.putAll(getCurrentAttemptMappings());
-            if (!cachedTasksById.containsKey(id)) {
-                // the task probably gone after a restart
-                cachedTasksById.put(id, null);
+        ExecutionVertex vertex = cachedTasksById.get(id);
+        if (vertex != null || cachedTasksById.containsKey(id)) {
+            return Optional.ofNullable(vertex);
+        }
+
+        return updateAndGet(id);
+    }
+
+    private Optional<ExecutionVertex> updateAndGet(ExecutionAttemptID id) {
+        synchronized (tasks) {
+            ExecutionVertex vertex = cachedTasksById.get(id);
+            if (vertex != null || cachedTasksById.containsKey(id)) {
+                return Optional.ofNullable(vertex);
+            }
+
+            Map<ExecutionAttemptID, ExecutionVertex> mappings = getCurrentAttemptMappings();
+            if (!mappings.containsKey(id)) {
+                mappings.put(id, null);
             }
+            cachedTasksById = mappings;
+            return Optional.ofNullable(cachedTasksById.get(id));
         }
-        return Optional.ofNullable(cachedTasksById.get(id));
     }
 
     private Map<ExecutionAttemptID, ExecutionVertex> getCurrentAttemptMappings() {