You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/10/07 12:01:27 UTC
[flink] branch master updated: [FLINK-14310][runtime] Get
ExecutionVertexID from ExecutionVertex rather than creating new instances
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new d413750 [FLINK-14310][runtime] Get ExecutionVertexID from ExecutionVertex rather than creating new instances
d413750 is described below
commit d41375048df0050fae5d15f44b165a77ce896f1e
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon Oct 7 12:08:53 2019 +0800
[FLINK-14310][runtime] Get ExecutionVertexID from ExecutionVertex rather than creating new instances
---
.../failover/AdaptedRestartPipelinedRegionStrategyNG.java | 2 +-
.../executiongraph/failover/adapter/DefaultFailoverTopology.java | 3 +--
.../scheduler/ExecutionGraphToInputsLocationsRetrieverAdapter.java | 2 +-
.../scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapter.java | 2 +-
.../adapter/ExecutionGraphToSchedulingTopologyAdapterTest.java | 4 ++--
5 files changed, 6 insertions(+), 7 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
index 164cd91..eb43a2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -268,7 +268,7 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
}
private ExecutionVertexID getExecutionVertexID(final ExecutionVertex vertex) {
- return new ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex());
+ return vertex.getID();
}
private List<ExecutionVertex> sortVerticesTopologically(final Set<ExecutionVertex> vertices) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/adapter/DefaultFailoverTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/adapter/DefaultFailoverTopology.java
index 7f832f0..e27d213 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/adapter/DefaultFailoverTopology.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/adapter/DefaultFailoverTopology.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverVertex;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import java.util.ArrayList;
import java.util.Collection;
@@ -56,7 +55,7 @@ public class DefaultFailoverTopology implements FailoverTopology {
final Map<ExecutionVertex, DefaultFailoverVertex> failoverVertexMap = new IdentityHashMap<>();
for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
final DefaultFailoverVertex failoverVertex = new DefaultFailoverVertex(
- new ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex()),
+ vertex.getID(),
vertex.getTaskNameWithSubtaskIndex());
this.failoverVertices.add(failoverVertex);
failoverVertexMap.put(vertex, failoverVertex);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapter.java
index 392f00e..96f7545 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapter.java
@@ -57,7 +57,7 @@ public class ExecutionGraphToInputsLocationsRetrieverAdapter implements InputsLo
List<ExecutionVertexID> producers = new ArrayList<>(inputEdges.length);
for (ExecutionEdge inputEdge : inputEdges) {
ExecutionVertex producer = inputEdge.getSource().getProducer();
- producers.add(new ExecutionVertexID(producer.getJobvertexId(), producer.getParallelSubtaskIndex()));
+ producers.add(producer.getID());
}
resultPartitionProducers.add(producers);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapter.java
index 9b377d8..94c835c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapter.java
@@ -107,7 +107,7 @@ public class ExecutionGraphToSchedulingTopologyAdapter implements SchedulingTopo
List<DefaultSchedulingResultPartition> producedPartitions) {
DefaultSchedulingExecutionVertex schedulingVertex = new DefaultSchedulingExecutionVertex(
- new ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex()),
+ vertex.getID(),
producedPartitions,
new ExecutionStateSupplier(vertex),
vertex.getInputDependencyConstraint());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapterTest.java
index 5a925da..ab72e2c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapterTest.java
@@ -181,7 +181,7 @@ public class ExecutionGraphToSchedulingTopologyAdapterTest extends TestLogger {
// since deep equality is verified later in the main loop
// this DOES rely on an implicit assumption that the vertices objects returned by the topology are
// identical to those stored in the partition
- ExecutionVertexID originalId = new ExecutionVertexID(originalConsumer.getJobvertexId(), originalConsumer.getParallelSubtaskIndex());
+ ExecutionVertexID originalId = originalConsumer.getID();
assertTrue(adaptedConsumers.stream().anyMatch(adaptedConsumer -> adaptedConsumer.getId().equals(originalId)));
}
}
@@ -203,7 +203,7 @@ public class ExecutionGraphToSchedulingTopologyAdapterTest extends TestLogger {
ExecutionVertex originalVertex,
SchedulingExecutionVertex adaptedVertex) {
assertEquals(
- new ExecutionVertexID(originalVertex.getJobvertexId(), originalVertex.getParallelSubtaskIndex()),
+ originalVertex.getID(),
adaptedVertex.getId());
assertEquals(originalVertex.getInputDependencyConstraint(), adaptedVertex.getInputDependencyConstraint());
}