You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/05/09 21:20:21 UTC
[flink] branch master updated: [FLINK-22609][runtime] Generalize
AllVerticesIterator
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 625ef0f [FLINK-22609][runtime] Generalize AllVerticesIterator
625ef0f is described below
commit 625ef0fac5a07a9cc0c034244c63613cd512d3ff
Author: Roc Marshal <64...@users.noreply.github.com>
AuthorDate: Mon May 10 05:20:02 2021 +0800
[FLINK-22609][runtime] Generalize AllVerticesIterator
---
.../executiongraph/AllVerticesIterator.java | 13 +++---
.../executiongraph/ArchivedExecutionGraph.java | 47 +---------------------
.../executiongraph/DefaultExecutionGraph.java | 7 +---
3 files changed, 9 insertions(+), 58 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
index 2ab70ce..6949686 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
@@ -21,15 +21,16 @@ package org.apache.flink.runtime.executiongraph;
import java.util.Iterator;
import java.util.NoSuchElementException;
-class AllVerticesIterator implements Iterator<ExecutionVertex> {
+class AllVerticesIterator<EV extends AccessExecutionVertex, EJV extends AccessExecutionJobVertex>
+ implements Iterator<EV> {
- private final Iterator<ExecutionJobVertex> jobVertices;
+ private final Iterator<EJV> jobVertices;
- private ExecutionVertex[] currVertices;
+ private EV[] currVertices;
private int currPos;
- public AllVerticesIterator(Iterator<ExecutionJobVertex> jobVertices) {
+ public AllVerticesIterator(Iterator<EJV> jobVertices) {
this.jobVertices = jobVertices;
}
@@ -43,7 +44,7 @@ class AllVerticesIterator implements Iterator<ExecutionVertex> {
currVertices = null;
}
} else if (jobVertices.hasNext()) {
- currVertices = jobVertices.next().getTaskVertices();
+ currVertices = (EV[]) jobVertices.next().getTaskVertices();
currPos = 0;
} else {
return false;
@@ -52,7 +53,7 @@ class AllVerticesIterator implements Iterator<ExecutionVertex> {
}
@Override
- public ExecutionVertex next() {
+ public EV next() {
if (hasNext()) {
return currVertices[currPos++];
} else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index b219993..c0b8795 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -212,7 +212,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
return new Iterable<ArchivedExecutionVertex>() {
@Override
public Iterator<ArchivedExecutionVertex> iterator() {
- return new AllVerticesIterator(getVerticesTopologically().iterator());
+ return new AllVerticesIterator<>(getVerticesTopologically().iterator());
}
};
}
@@ -262,51 +262,6 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
return Optional.ofNullable(checkpointStorageName);
}
- class AllVerticesIterator implements Iterator<ArchivedExecutionVertex> {
-
- private final Iterator<ArchivedExecutionJobVertex> jobVertices;
-
- private ArchivedExecutionVertex[] currVertices;
-
- private int currPos;
-
- public AllVerticesIterator(Iterator<ArchivedExecutionJobVertex> jobVertices) {
- this.jobVertices = jobVertices;
- }
-
- @Override
- public boolean hasNext() {
- while (true) {
- if (currVertices != null) {
- if (currPos < currVertices.length) {
- return true;
- } else {
- currVertices = null;
- }
- } else if (jobVertices.hasNext()) {
- currVertices = jobVertices.next().getTaskVertices();
- currPos = 0;
- } else {
- return false;
- }
- }
- }
-
- @Override
- public ArchivedExecutionVertex next() {
- if (hasNext()) {
- return currVertices[currPos++];
- } else {
- throw new NoSuchElementException();
- }
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
- }
-
/**
* Create a {@link ArchivedExecutionGraph} from the given {@link ExecutionGraph}.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 26ac38f..9eb8965 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -639,12 +639,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
@Override
public Iterable<ExecutionVertex> getAllExecutionVertices() {
- return new Iterable<ExecutionVertex>() {
- @Override
- public Iterator<ExecutionVertex> iterator() {
- return new AllVerticesIterator(getVerticesTopologically().iterator());
- }
- };
+ return () -> new AllVerticesIterator<>(getVerticesTopologically().iterator());
}
@Override