You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2021/12/02 16:02:01 UTC
[flink] branch release-1.14 updated: [FLINK-24919][runtime] Getting vertex only under synchronization
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new d26c0e5 [FLINK-24919][runtime] Getting vertex only under synchronization
d26c0e5 is described below
commit d26c0e511e9f37671b52c23df4c09e7aa3719d5a
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Mon Nov 29 13:23:14 2021 +0100
[FLINK-24919][runtime] Getting vertex only under synchronization
---
.../ExecutionAttemptMappingProvider.java | 39 ++++++++++++----------
1 file changed, 22 insertions(+), 17 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
index a7f7f83..f74804b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ExecutionAttemptMappingProvider.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -38,32 +37,38 @@ public class ExecutionAttemptMappingProvider {
private final List<ExecutionVertex> tasks;
/** The cached mapping, which would only be updated on miss. */
- private final LinkedHashMap<ExecutionAttemptID, ExecutionVertex> cachedTasksById;
+ private Map<ExecutionAttemptID, ExecutionVertex> cachedTasksById;
public ExecutionAttemptMappingProvider(Iterable<ExecutionVertex> tasksIterable) {
this.tasks = new ArrayList<>();
tasksIterable.forEach(this.tasks::add);
- this.cachedTasksById =
- new LinkedHashMap<ExecutionAttemptID, ExecutionVertex>(tasks.size()) {
-
- @Override
- protected boolean removeEldestEntry(
- Map.Entry<ExecutionAttemptID, ExecutionVertex> eldest) {
- return size() > tasks.size();
- }
- };
+ this.cachedTasksById = new HashMap<>(tasks.size());
}
public Optional<ExecutionVertex> getVertex(ExecutionAttemptID id) {
- if (!cachedTasksById.containsKey(id)) {
- cachedTasksById.putAll(getCurrentAttemptMappings());
- if (!cachedTasksById.containsKey(id)) {
- // the task probably gone after a restart
- cachedTasksById.put(id, null);
+ ExecutionVertex vertex = cachedTasksById.get(id);
+ if (vertex != null || cachedTasksById.containsKey(id)) {
+ return Optional.ofNullable(vertex);
+ }
+
+ return updateAndGet(id);
+ }
+
+ private Optional<ExecutionVertex> updateAndGet(ExecutionAttemptID id) {
+ synchronized (tasks) {
+ ExecutionVertex vertex = cachedTasksById.get(id);
+ if (vertex != null || cachedTasksById.containsKey(id)) {
+ return Optional.ofNullable(vertex);
+ }
+
+ Map<ExecutionAttemptID, ExecutionVertex> mappings = getCurrentAttemptMappings();
+ if (!mappings.containsKey(id)) {
+ mappings.put(id, null);
}
+ cachedTasksById = mappings;
+ return Optional.ofNullable(cachedTasksById.get(id));
}
- return Optional.ofNullable(cachedTasksById.get(id));
}
private Map<ExecutionAttemptID, ExecutionVertex> getCurrentAttemptMappings() {