You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/07/02 15:08:16 UTC
[flink] 02/05: [hotfix][runtime] Use Set instead of IdentityHashMap
where possible
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit bce9a4c43fb6ff61762885cb0fbc984a7338ce19
Author: Gary Yao <ga...@apache.org>
AuthorDate: Thu Jun 20 17:20:00 2019 +0200
[hotfix][runtime] Use Set instead of IdentityHashMap where possible
Replace instances where we use an IdentityHashMap as a set with
Collections.newSetFromMap() in RestartPipelinedRegionStrategy and
PipelineRegionComputeUtil.
---
.../failover/flip1/PipelinedRegionComputeUtil.java | 11 ++++------
.../flip1/RestartPipelinedRegionStrategy.java | 25 +++++++++++-----------
2 files changed, 17 insertions(+), 19 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
index 291c894..b21b52f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/PipelinedRegionComputeUtil.java
@@ -22,6 +22,7 @@ package org.apache.flink.runtime.executiongraph.failover.flip1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
@@ -41,8 +42,6 @@ public final class PipelinedRegionComputeUtil {
return uniqueRegions(buildOneRegionForAllVertices(topology));
}
- // we use the map (list -> null) to imitate an IdentityHashSet (which does not exist)
- // this helps to optimize the building performance as it uses reference equality
final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion = new IdentityHashMap<>();
// iterate all the vertices which are topologically sorted
@@ -105,11 +104,9 @@ public final class PipelinedRegionComputeUtil {
private static Set<Set<FailoverVertex>> uniqueRegions(final Map<FailoverVertex, Set<FailoverVertex>> vertexToRegion) {
// find out all the distinct regions
- final IdentityHashMap<Set<FailoverVertex>, Object> distinctRegions = new IdentityHashMap<>();
- for (Set<FailoverVertex> regionVertices : vertexToRegion.values()) {
- distinctRegions.put(regionVertices, null);
- }
- return distinctRegions.keySet();
+ final Set<Set<FailoverVertex>> distinctRegions = Collections.newSetFromMap(new IdentityHashMap<>());
+ distinctRegions.addAll(vertexToRegion.values());
+ return distinctRegions;
}
private PipelinedRegionComputeUtil() {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
index 58d1675..a87764b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/RestartPipelinedRegionStrategy.java
@@ -27,6 +27,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayDeque;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
@@ -50,7 +51,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
private final FailoverTopology topology;
/** All failover regions. */
- private final IdentityHashMap<FailoverRegion, Object> regions;
+ private final Set<FailoverRegion> regions;
/** Maps execution vertex id to failover region. */
private final Map<ExecutionVertexID, FailoverRegion> vertexToRegionMap;
@@ -80,7 +81,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
ResultPartitionAvailabilityChecker resultPartitionAvailabilityChecker) {
this.topology = checkNotNull(topology);
- this.regions = new IdentityHashMap<>();
+ this.regions = Collections.newSetFromMap(new IdentityHashMap<>());
this.vertexToRegionMap = new HashMap<>();
this.resultPartitionAvailabilityChecker = new RegionFailoverResultPartitionAvailabilityChecker(
resultPartitionAvailabilityChecker);
@@ -100,7 +101,7 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
for (Set<FailoverVertex> regionVertices : distinctRegions) {
LOG.debug("Creating a failover region with {} vertices.", regionVertices.size());
final FailoverRegion failoverRegion = new FailoverRegion(regionVertices);
- regions.put(failoverRegion, null);
+ regions.add(failoverRegion);
for (FailoverVertex vertex : regionVertices) {
vertexToRegionMap.put(vertex.getExecutionVertexID(), failoverRegion);
}
@@ -171,26 +172,26 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
* 3. If a region is involved, all of its consumer regions are involved
*/
private Set<FailoverRegion> getRegionsToRestart(FailoverRegion failedRegion) {
- IdentityHashMap<FailoverRegion, Object> regionsToRestart = new IdentityHashMap<>();
- IdentityHashMap<FailoverRegion, Object> visitedRegions = new IdentityHashMap<>();
+ Set<FailoverRegion> regionsToRestart = Collections.newSetFromMap(new IdentityHashMap<>());
+ Set<FailoverRegion> visitedRegions = Collections.newSetFromMap(new IdentityHashMap<>());
// start from the failed region to visit all involved regions
Queue<FailoverRegion> regionsToVisit = new ArrayDeque<>();
- visitedRegions.put(failedRegion, null);
+ visitedRegions.add(failedRegion);
regionsToVisit.add(failedRegion);
while (!regionsToVisit.isEmpty()) {
FailoverRegion regionToRestart = regionsToVisit.poll();
// an involved region should be restarted
- regionsToRestart.put(regionToRestart, null);
+ regionsToRestart.add(regionToRestart);
// if a needed input result partition is not available, its producer region is involved
for (FailoverVertex vertex : regionToRestart.getAllExecutionVertices()) {
for (FailoverEdge inEdge : vertex.getInputEdges()) {
if (!resultPartitionAvailabilityChecker.isAvailable(inEdge.getResultPartitionID())) {
FailoverRegion producerRegion = vertexToRegionMap.get(inEdge.getSourceVertex().getExecutionVertexID());
- if (!visitedRegions.containsKey(producerRegion)) {
- visitedRegions.put(producerRegion, null);
+ if (!visitedRegions.contains(producerRegion)) {
+ visitedRegions.add(producerRegion);
regionsToVisit.add(producerRegion);
}
}
@@ -201,15 +202,15 @@ public class RestartPipelinedRegionStrategy implements FailoverStrategy {
for (FailoverVertex vertex : regionToRestart.getAllExecutionVertices()) {
for (FailoverEdge outEdge : vertex.getOutputEdges()) {
FailoverRegion consumerRegion = vertexToRegionMap.get(outEdge.getTargetVertex().getExecutionVertexID());
- if (!visitedRegions.containsKey(consumerRegion)) {
- visitedRegions.put(consumerRegion, null);
+ if (!visitedRegions.contains(consumerRegion)) {
+ visitedRegions.add(consumerRegion);
regionsToVisit.add(consumerRegion);
}
}
}
}
- return regionsToRestart.keySet();
+ return regionsToRestart;
}
// ------------------------------------------------------------------------