You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/10/07 12:01:27 UTC

[flink] branch master updated: [FLINK-14310][runtime] Get ExecutionVertexID from ExecutionVertex rather than creating new instances

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d413750  [FLINK-14310][runtime] Get ExecutionVertexID from ExecutionVertex rather than creating new instances
d413750 is described below

commit d41375048df0050fae5d15f44b165a77ce896f1e
Author: Zhu Zhu <re...@gmail.com>
AuthorDate: Mon Oct 7 12:08:53 2019 +0800

    [FLINK-14310][runtime] Get ExecutionVertexID from ExecutionVertex rather than creating new instances
---
 .../failover/AdaptedRestartPipelinedRegionStrategyNG.java             | 2 +-
 .../executiongraph/failover/adapter/DefaultFailoverTopology.java      | 3 +--
 .../scheduler/ExecutionGraphToInputsLocationsRetrieverAdapter.java    | 2 +-
 .../scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapter.java  | 2 +-
 .../adapter/ExecutionGraphToSchedulingTopologyAdapterTest.java        | 4 ++--
 5 files changed, 6 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
index 164cd91..eb43a2c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/AdaptedRestartPipelinedRegionStrategyNG.java
@@ -268,7 +268,7 @@ public class AdaptedRestartPipelinedRegionStrategyNG extends FailoverStrategy {
 	}
 
 	private ExecutionVertexID getExecutionVertexID(final ExecutionVertex vertex) {
-		return new ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex());
+		return vertex.getID();
 	}
 
 	private List<ExecutionVertex> sortVerticesTopologically(final Set<ExecutionVertex> vertices) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/adapter/DefaultFailoverTopology.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/adapter/DefaultFailoverTopology.java
index 7f832f0..e27d213 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/adapter/DefaultFailoverTopology.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/adapter/DefaultFailoverTopology.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverTopology;
 import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverVertex;
-import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -56,7 +55,7 @@ public class DefaultFailoverTopology implements FailoverTopology {
 		final Map<ExecutionVertex, DefaultFailoverVertex> failoverVertexMap = new IdentityHashMap<>();
 		for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
 			final DefaultFailoverVertex failoverVertex = new DefaultFailoverVertex(
-				new ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex()),
+				vertex.getID(),
 				vertex.getTaskNameWithSubtaskIndex());
 			this.failoverVertices.add(failoverVertex);
 			failoverVertexMap.put(vertex, failoverVertex);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapter.java
index 392f00e..96f7545 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphToInputsLocationsRetrieverAdapter.java
@@ -57,7 +57,7 @@ public class ExecutionGraphToInputsLocationsRetrieverAdapter implements InputsLo
 			List<ExecutionVertexID> producers = new ArrayList<>(inputEdges.length);
 			for (ExecutionEdge inputEdge : inputEdges) {
 				ExecutionVertex producer = inputEdge.getSource().getProducer();
-				producers.add(new ExecutionVertexID(producer.getJobvertexId(), producer.getParallelSubtaskIndex()));
+				producers.add(producer.getID());
 			}
 			resultPartitionProducers.add(producers);
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapter.java
index 9b377d8..94c835c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapter.java
@@ -107,7 +107,7 @@ public class ExecutionGraphToSchedulingTopologyAdapter implements SchedulingTopo
 		List<DefaultSchedulingResultPartition> producedPartitions) {
 
 		DefaultSchedulingExecutionVertex schedulingVertex = new DefaultSchedulingExecutionVertex(
-			new ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex()),
+			vertex.getID(),
 			producedPartitions,
 			new ExecutionStateSupplier(vertex),
 			vertex.getInputDependencyConstraint());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapterTest.java
index 5a925da..ab72e2c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/ExecutionGraphToSchedulingTopologyAdapterTest.java
@@ -181,7 +181,7 @@ public class ExecutionGraphToSchedulingTopologyAdapterTest extends TestLogger {
 				// since deep equality is verified later in the main loop
 				// this DOES rely on an implicit assumption that the vertices objects returned by the topology are
 				// identical to those stored in the partition
-				ExecutionVertexID originalId = new ExecutionVertexID(originalConsumer.getJobvertexId(), originalConsumer.getParallelSubtaskIndex());
+				ExecutionVertexID originalId = originalConsumer.getID();
 				assertTrue(adaptedConsumers.stream().anyMatch(adaptedConsumer -> adaptedConsumer.getId().equals(originalId)));
 			}
 		}
@@ -203,7 +203,7 @@ public class ExecutionGraphToSchedulingTopologyAdapterTest extends TestLogger {
 		ExecutionVertex originalVertex,
 		SchedulingExecutionVertex adaptedVertex) {
 		assertEquals(
-			new ExecutionVertexID(originalVertex.getJobvertexId(), originalVertex.getParallelSubtaskIndex()),
+			originalVertex.getID(),
 			adaptedVertex.getId());
 		assertEquals(originalVertex.getInputDependencyConstraint(), adaptedVertex.getInputDependencyConstraint());
 	}