You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2021/05/09 21:20:21 UTC

[flink] branch master updated: [FLINK-22609][runtime] Generalize AllVerticesIterator

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 625ef0f  [FLINK-22609][runtime] Generalize AllVerticesIterator
625ef0f is described below

commit 625ef0fac5a07a9cc0c034244c63613cd512d3ff
Author: Roc Marshal <64...@users.noreply.github.com>
AuthorDate: Mon May 10 05:20:02 2021 +0800

    [FLINK-22609][runtime] Generalize AllVerticesIterator
---
 .../executiongraph/AllVerticesIterator.java        | 13 +++---
 .../executiongraph/ArchivedExecutionGraph.java     | 47 +---------------------
 .../executiongraph/DefaultExecutionGraph.java      |  7 +---
 3 files changed, 9 insertions(+), 58 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
index 2ab70ce..6949686 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AllVerticesIterator.java
@@ -21,15 +21,16 @@ package org.apache.flink.runtime.executiongraph;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
 
-class AllVerticesIterator implements Iterator<ExecutionVertex> {
+class AllVerticesIterator<EV extends AccessExecutionVertex, EJV extends AccessExecutionJobVertex>
+        implements Iterator<EV> {
 
-    private final Iterator<ExecutionJobVertex> jobVertices;
+    private final Iterator<EJV> jobVertices;
 
-    private ExecutionVertex[] currVertices;
+    private EV[] currVertices;
 
     private int currPos;
 
-    public AllVerticesIterator(Iterator<ExecutionJobVertex> jobVertices) {
+    public AllVerticesIterator(Iterator<EJV> jobVertices) {
         this.jobVertices = jobVertices;
     }
 
@@ -43,7 +44,7 @@ class AllVerticesIterator implements Iterator<ExecutionVertex> {
                     currVertices = null;
                 }
             } else if (jobVertices.hasNext()) {
-                currVertices = jobVertices.next().getTaskVertices();
+                currVertices = (EV[]) jobVertices.next().getTaskVertices();
                 currPos = 0;
             } else {
                 return false;
@@ -52,7 +53,7 @@ class AllVerticesIterator implements Iterator<ExecutionVertex> {
     }
 
     @Override
-    public ExecutionVertex next() {
+    public EV next() {
         if (hasNext()) {
             return currVertices[currPos++];
         } else {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index b219993..c0b8795 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -212,7 +212,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
         return new Iterable<ArchivedExecutionVertex>() {
             @Override
             public Iterator<ArchivedExecutionVertex> iterator() {
-                return new AllVerticesIterator(getVerticesTopologically().iterator());
+                return new AllVerticesIterator<>(getVerticesTopologically().iterator());
             }
         };
     }
@@ -262,51 +262,6 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
         return Optional.ofNullable(checkpointStorageName);
     }
 
-    class AllVerticesIterator implements Iterator<ArchivedExecutionVertex> {
-
-        private final Iterator<ArchivedExecutionJobVertex> jobVertices;
-
-        private ArchivedExecutionVertex[] currVertices;
-
-        private int currPos;
-
-        public AllVerticesIterator(Iterator<ArchivedExecutionJobVertex> jobVertices) {
-            this.jobVertices = jobVertices;
-        }
-
-        @Override
-        public boolean hasNext() {
-            while (true) {
-                if (currVertices != null) {
-                    if (currPos < currVertices.length) {
-                        return true;
-                    } else {
-                        currVertices = null;
-                    }
-                } else if (jobVertices.hasNext()) {
-                    currVertices = jobVertices.next().getTaskVertices();
-                    currPos = 0;
-                } else {
-                    return false;
-                }
-            }
-        }
-
-        @Override
-        public ArchivedExecutionVertex next() {
-            if (hasNext()) {
-                return currVertices[currPos++];
-            } else {
-                throw new NoSuchElementException();
-            }
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
     /**
      * Create a {@link ArchivedExecutionGraph} from the given {@link ExecutionGraph}.
      *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index 26ac38f..9eb8965 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -639,12 +639,7 @@ public class DefaultExecutionGraph implements ExecutionGraph, InternalExecutionG
 
     @Override
     public Iterable<ExecutionVertex> getAllExecutionVertices() {
-        return new Iterable<ExecutionVertex>() {
-            @Override
-            public Iterator<ExecutionVertex> iterator() {
-                return new AllVerticesIterator(getVerticesTopologically().iterator());
-            }
-        };
+        return () -> new AllVerticesIterator<>(getVerticesTopologically().iterator());
     }
 
     @Override