You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/02 15:08:16 UTC

[flink] 02/05: [hotfix][runtime] Use Set instead of IdentityHashMap where possible

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bce9a4c43fb6ff61762885cb0fbc984a7338ce19
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Jun 20 17:20:00 2019 +0200

    [hotfix][runtime] Use Set instead of IdentityHashMap where possible
    
    Replace instances where we use an IdentityHashMap as a set with
    Collections.newSetFromMap() in RestartPipelinedRegionStrategy and
    PipelineRegionComputeUtil.
---
 .../failover/flip1/PipelinedRegionComputeUtil.java | 11 ++++------
 .../flip1/RestartPipelinedRegionStrategy.java      | 25 +++++++++++-----------
 2 files changed, 17 insertions(+), 19 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index 291c894..b21b52f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.executiongraph.failover.flip1;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Map;
@@ -41,8 +42,6 @@ public final class PipelinedRegionComputeUtil {
 			return uniqueRegions(buildOneRegionForAllVertices(topology));
 		}
 
-		// we use the map (list -> null) to imitate an IdentityHashSet (which does not exist)
-		// this helps to optimize the building performance as it uses reference equality
 		final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion = new IdentityHashMap<>();
 
 		// iterate all the vertices which are topologically sorted
@@ -105,11 +104,9 @@ public final class PipelinedRegionComputeUtil {
 
 	private static Set<Set<FailoverVertex>> uniqueRegions(final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion) {
 		// find out all the distinct regions
-		final IdentityHashMap<Set<FailoverVertex>, Object> distinctRegions = new IdentityHashMap<>();
-		for (Set<FailoverVertex> regionVertices : vertexToRegion.values()) {
-			distinctRegions.put(regionVertices, null);
-		}
-		return distinctRegions.keySet();
+		final Set<Set<FailoverVertex>> distinctRegions = Collections.newSetFromMap(new IdentityHashMap<>());
+		distinctRegions.addAll(vertexToRegion.values());
+		return distinctRegions;
 	}
 
 	private PipelinedRegionComputeUtil() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
index 58d1675..a87764b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
@@ -50,7 +51,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
 	private final FailoverTopology topology;
 
 	/** All failover regions. */
-	private final IdentityHashMap<FailoverRegion, Object> regions;
+	private final Set<FailoverRegion> regions;
 
 	/** Maps execution vertex id to failover region. */
 	private final Map<ExecutionVertexID, FailoverRegion> vertexToRegionMap;
@@ -80,7 +81,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
 		ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) {
 
 		this.topology = checkNotNull(topology);
-		this.regions = new IdentityHashMap<>();
+		this.regions = Collections.newSetFromMap(new IdentityHashMap<>());
 		this.vertexToRegionMap = new HashMap<>();
 		this.resultPartitionAvailabilityChecker = new RegionFailoverResultPartitionAvailabilityChecker(
 			resultPartitionAvailabilityChecker);
@@ -100,7 +101,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
 		for (Set<FailoverVertex> regionVertices : distinctRegions) {
 			LOG.debug("Creating a failover region with {} vertices.", regionVertices.size());
 			final FailoverRegion failoverRegion = new FailoverRegion(regionVertices);
-			regions.put(failoverRegion, null);
+			regions.add(failoverRegion);
 			for (FailoverVertex vertex : regionVertices) {
 				vertexToRegionMap.put(vertex.getExecutionVertexID(), failoverRegion);
 			}
@@ -171,26 +172,26 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
 	 * 3. If a region is involved, all of its consumer regions are involved
 	 */
 	private Set<FailoverRegion> getRegionsToRestart(FailoverRegion failedRegion) {
-		IdentityHashMap<FailoverRegion, Object> regionsToRestart = new IdentityHashMap<>();
-		IdentityHashMap<FailoverRegion, Object> visitedRegions = new IdentityHashMap<>();
+		Set<FailoverRegion> regionsToRestart = Collections.newSetFromMap(new IdentityHashMap<>());
+		Set<FailoverRegion> visitedRegions = Collections.newSetFromMap(new IdentityHashMap<>());
 
 		// start from the failed region to visit all involved regions
 		Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
-		visitedRegions.put(failedRegion, null);
+		visitedRegions.add(failedRegion);
 		regionsToVisit.add(failedRegion);
 		while (!regionsToVisit.isEmpty()) {
 			FailoverRegion regionToRestart = regionsToVisit.poll();
 
 			// an involved region should be restarted
-			regionsToRestart.put(regionToRestart, null);
+			regionsToRestart.add(regionToRestart);
 
 			// if a needed input result partition is not available, its producer region is involved
 			for (FailoverVertex vertex : regionToRestart.getAllExecutionVertices()) {
 				for (FailoverEdge inEdge : vertex.getInputEdges()) {
 					if (!resultPartitionAvailabilityChecker.isAvailable(inEdge.getResultPartitionID())) {
 						FailoverRegion producerRegion = vertexToRegionMap.get(inEdge.getSourceVertex().getExecutionVertexID());
-						if (!visitedRegions.containsKey(producerRegion)) {
-							visitedRegions.put(producerRegion, null);
+						if (!visitedRegions.contains(producerRegion)) {
+							visitedRegions.add(producerRegion);
 							regionsToVisit.add(producerRegion);
 						}
 					}
@@ -201,15 +202,15 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
 			for (FailoverVertex vertex : regionToRestart.getAllExecutionVertices()) {
 				for (FailoverEdge outEdge : vertex.getOutputEdges()) {
 					FailoverRegion consumerRegion = vertexToRegionMap.get(outEdge.getTargetVertex().getExecutionVertexID());
-					if (!visitedRegions.containsKey(consumerRegion)) {
-						visitedRegions.put(consumerRegion, null);
+					if (!visitedRegions.contains(consumerRegion)) {
+						visitedRegions.add(consumerRegion);
 						regionsToVisit.add(consumerRegion);
 					}
 				}
 			}
 		}
 
-		return regionsToRestart.keySet();
+		return regionsToRestart;
 	}
 
 	// ------------------------------------------------------------------------