You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/22 00:34:43 UTC

[incubator-nemo] branch reshaping updated: chkpt

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch reshaping
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/reshaping by this push:
     new ba034cc  chkpt
ba034cc is described below

commit ba034ccc4c805467c60d1c8ebdc21f0ebd8847fc
Author: John Yang <jo...@apache.org>
AuthorDate: Tue Jan 22 09:34:31 2019 +0900

    chkpt
---
 .../main/java/org/apache/nemo/common/ir/IRDAG.java |  3 +-
 .../CommonSubexpressionEliminationPass.java        | 99 +++++++++++-----------
 .../reshaping/LargeShuffleRelayReshapingPass.java  | 44 ----------
 .../compiletime/reshaping/LoopExtractionPass.java  | 19 +++--
 .../compiletime/reshaping/LoopOptimizations.java   | 92 ++++++++++----------
 5 files changed, 111 insertions(+), 146 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
index d91d674..a8d18bd 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/IRDAG.java
@@ -31,6 +31,7 @@ import org.apache.nemo.common.ir.vertex.system.SystemIRVertex;
 
 import java.util.List;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * An IRDAG object captures a high-level data processing application (e.g., Spark/Beam application).
@@ -135,7 +136,7 @@ public class IRDAG {
 
   ////////////////////////////////////////////////// "Un-safe" direct reshaping (semantic-preserving not guaranteed)
 
-  public void unSafeDirectReshaping(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge> unsafeReshaping) {
+  public void unSafeDirectReshaping(final Function<DAG<IRVertex, IREdge>, DAG<IRVertex, IREdge>> unsafeReshaping) {
     this.dag = unsafeReshaping.apply(dag);
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
index 898690e..5f49638 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/CommonSubexpressionEliminationPass.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
 import org.apache.nemo.common.coder.DecoderFactory;
 import org.apache.nemo.common.coder.EncoderFactory;
+import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.dag.DAGBuilder;
 import org.apache.nemo.common.ir.IRDAG;
 import org.apache.nemo.common.ir.edge.IREdge;
@@ -50,68 +51,69 @@ public final class CommonSubexpressionEliminationPass extends ReshapingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
+  public void optimize(final IRDAG inputDAG) {
     // find and collect vertices with equivalent transforms
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
     final Map<Transform, List<OperatorVertex>> operatorVerticesToBeMerged = new HashMap<>();
     final Map<OperatorVertex, Set<IREdge>> inEdges = new HashMap<>();
     final Map<OperatorVertex, Set<IREdge>> outEdges = new HashMap<>();
 
-    dag.topologicalDo(irVertex -> {
-      if (irVertex instanceof OperatorVertex) {
-        final OperatorVertex operatorVertex = (OperatorVertex) irVertex;
-        operatorVerticesToBeMerged.putIfAbsent(operatorVertex.getTransform(), new ArrayList<>());
-        operatorVerticesToBeMerged.get(operatorVertex.getTransform()).add(operatorVertex);
+    inputDAG.unSafeDirectReshaping(dag -> {
+      dag.topologicalDo(irVertex -> {
+        if (irVertex instanceof OperatorVertex) {
+          final OperatorVertex operatorVertex = (OperatorVertex) irVertex;
+          operatorVerticesToBeMerged.putIfAbsent(operatorVertex.getTransform(), new ArrayList<>());
+          operatorVerticesToBeMerged.get(operatorVertex.getTransform()).add(operatorVertex);
 
-        dag.getIncomingEdgesOf(operatorVertex).forEach(irEdge -> {
-          inEdges.putIfAbsent(operatorVertex, new HashSet<>());
-          inEdges.get(operatorVertex).add(irEdge);
-          if (irEdge.getSrc() instanceof OperatorVertex) {
-            final OperatorVertex source = (OperatorVertex) irEdge.getSrc();
-            outEdges.putIfAbsent(source, new HashSet<>());
-            outEdges.get(source).add(irEdge);
-          }
-        });
-      } else {
-        builder.addVertex(irVertex, dag);
-        dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
-          if (irEdge.getSrc() instanceof OperatorVertex) {
-            final OperatorVertex source = (OperatorVertex) irEdge.getSrc();
-            outEdges.putIfAbsent(source, new HashSet<>());
-            outEdges.get(source).add(irEdge);
-          } else {
-            builder.connectVertices(irEdge);
-          }
-        });
-      }
-    });
+          dag.getIncomingEdgesOf(operatorVertex).forEach(irEdge -> {
+            inEdges.putIfAbsent(operatorVertex, new HashSet<>());
+            inEdges.get(operatorVertex).add(irEdge);
+            if (irEdge.getSrc() instanceof OperatorVertex) {
+              final OperatorVertex source = (OperatorVertex) irEdge.getSrc();
+              outEdges.putIfAbsent(source, new HashSet<>());
+              outEdges.get(source).add(irEdge);
+            }
+          });
+        } else {
+          builder.addVertex(irVertex, dag);
+          dag.getIncomingEdgesOf(irVertex).forEach(irEdge -> {
+            if (irEdge.getSrc() instanceof OperatorVertex) {
+              final OperatorVertex source = (OperatorVertex) irEdge.getSrc();
+              outEdges.putIfAbsent(source, new HashSet<>());
+              outEdges.get(source).add(irEdge);
+            } else {
+              builder.connectVertices(irEdge);
+            }
+          });
+        }
+      });
 
-    // merge them if they are not dependent on each other, and add IRVertices to the builder.
-    operatorVerticesToBeMerged.forEach(((transform, operatorVertices) -> {
-      final Map<Set<IRVertex>, List<OperatorVertex>> verticesToBeMergedWithIdenticalSources = new HashMap<>();
+      // merge them if they are not dependent on each other, and add IRVertices to the builder.
+      operatorVerticesToBeMerged.forEach(((transform, operatorVertices) -> {
+        final Map<Set<IRVertex>, List<OperatorVertex>> verticesToBeMergedWithIdenticalSources = new HashMap<>();
 
-      operatorVertices.forEach(operatorVertex -> {
-        // compare if incoming vertices are identical.
-        final Set<IRVertex> incomingVertices = dag.getIncomingEdgesOf(operatorVertex).stream().map(IREdge::getSrc)
+        operatorVertices.forEach(operatorVertex -> {
+          // compare if incoming vertices are identical.
+          final Set<IRVertex> incomingVertices = dag.getIncomingEdgesOf(operatorVertex).stream().map(IREdge::getSrc)
             .collect(Collectors.toSet());
-        if (verticesToBeMergedWithIdenticalSources.keySet().stream()
+          if (verticesToBeMergedWithIdenticalSources.keySet().stream()
             .anyMatch(lst -> lst.containsAll(incomingVertices) && incomingVertices.containsAll(lst))) {
-          final Set<IRVertex> foundKey = verticesToBeMergedWithIdenticalSources.keySet().stream()
+            final Set<IRVertex> foundKey = verticesToBeMergedWithIdenticalSources.keySet().stream()
               .filter(vs -> vs.containsAll(incomingVertices) && incomingVertices.containsAll(vs))
               .findFirst().get();
-          verticesToBeMergedWithIdenticalSources.get(foundKey).add(operatorVertex);
-        } else {
-          verticesToBeMergedWithIdenticalSources.putIfAbsent(incomingVertices, new ArrayList<>());
-          verticesToBeMergedWithIdenticalSources.get(incomingVertices).add(operatorVertex);
-        }
-      });
+            verticesToBeMergedWithIdenticalSources.get(foundKey).add(operatorVertex);
+          } else {
+            verticesToBeMergedWithIdenticalSources.putIfAbsent(incomingVertices, new ArrayList<>());
+            verticesToBeMergedWithIdenticalSources.get(incomingVertices).add(operatorVertex);
+          }
+        });
 
-      verticesToBeMergedWithIdenticalSources.values().forEach(ovs ->
+        verticesToBeMergedWithIdenticalSources.values().forEach(ovs ->
           mergeAndAddToBuilder(ovs, builder, dag, inEdges, outEdges));
-    }));
+      }));
 
-    // process IREdges
-    operatorVerticesToBeMerged.values().forEach(operatorVertices ->
+      // process IREdges
+      operatorVerticesToBeMerged.values().forEach(operatorVertices ->
         operatorVertices.forEach(operatorVertex -> {
           inEdges.getOrDefault(operatorVertex, new HashSet<>()).forEach(e -> {
             if (builder.contains(operatorVertex) && builder.contains(e.getSrc())) {
@@ -125,7 +127,8 @@ public final class CommonSubexpressionEliminationPass extends ReshapingPass {
           });
         }));
 
-    return builder.build();
+      return builder.build();
+    });
   }
 
   /**
@@ -137,7 +140,7 @@ public final class CommonSubexpressionEliminationPass extends ReshapingPass {
    * @param outEdges outgoing edges information.
    */
   private static void mergeAndAddToBuilder(final List<OperatorVertex> ovs, final DAGBuilder<IRVertex, IREdge> builder,
-                                           final IRDAG dag,
+                                           final DAG<IRVertex, IREdge> dag,
                                            final Map<OperatorVertex, Set<IREdge>> inEdges,
                                            final Map<OperatorVertex, Set<IREdge>> outEdges) {
     if (ovs.size() > 0) {
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
index e9ce8e2..47aef26 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LargeShuffleRelayReshapingPass.java
@@ -19,13 +19,8 @@
 package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
 import org.apache.nemo.common.ir.IRDAG;
-import org.apache.nemo.common.ir.edge.IREdge;
 import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.system.StreamVertex;
-import org.apache.nemo.common.ir.vertex.transform.RelayTransform;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
 
 /**
@@ -53,43 +48,4 @@ public final class LargeShuffleRelayReshapingPass extends ReshapingPass {
       });
     });
   }
-
-  /*
-  @Override
-  public void optimize(final IRDAG dag) {
-    dag.topologicalDo(v -> {
-      builder.addVertex(v);
-      // We care about OperatorVertices that have any incoming edge that
-      // has Shuffle as data communication pattern.
-      if (v instanceof OperatorVertex && dag.getIncomingEdgesOf(v).stream().anyMatch(irEdge ->
-        CommunicationPatternProperty.Value.Shuffle
-          .equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get()))) {
-        dag.getIncomingEdgesOf(v).forEach(edge -> {
-          if (CommunicationPatternProperty.Value.Shuffle
-            .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
-            // Insert a merger vertex having transform that write received data immediately
-            // before the vertex receiving shuffled data.
-            final OperatorVertex iFileMergerVertex = new OperatorVertex(new RelayTransform());
-
-            builder.addVertex(iFileMergerVertex);
-            final IREdge newEdgeToMerger =
-              new IREdge(CommunicationPatternProperty.Value.Shuffle, edge.getSrc(), iFileMergerVertex);
-            edge.copyExecutionPropertiesTo(newEdgeToMerger);
-            final IREdge newEdgeFromMerger = new IREdge(CommunicationPatternProperty.Value.OneToOne,
-              iFileMergerVertex, v);
-            newEdgeFromMerger.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
-            newEdgeFromMerger.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
-            builder.connectVertices(newEdgeToMerger);
-            builder.connectVertices(newEdgeFromMerger);
-          } else {
-            builder.connectVertices(edge);
-          }
-        });
-      } else { // Others are simply added to the builder.
-        dag.getIncomingEdgesOf(v).forEach(builder::connectVertices);
-      }
-    });
-    return builder.build();
-  }
-  */
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
index 756a027..637036a 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopExtractionPass.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
+import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.dag.DAGBuilder;
 import org.apache.nemo.common.ir.IRDAG;
 import org.apache.nemo.common.ir.edge.IREdge;
@@ -49,9 +50,11 @@ public final class LoopExtractionPass extends ReshapingPass {
   }
 
   @Override
-  public void optimize(final IRDAG dag) {
-    final Integer maxStackDepth = this.findMaxLoopVertexStackDepth(dag);
-    return groupLoops(dag, maxStackDepth);
+  public void optimize(final IRDAG inputDAG) {
+    inputDAG.unSafeDirectReshaping(dag -> {
+      final Integer maxStackDepth = this.findMaxLoopVertexStackDepth(dag);
+      return groupLoops(dag, maxStackDepth);
+    });
   }
 
   /**
@@ -60,7 +63,7 @@ public final class LoopExtractionPass extends ReshapingPass {
    * @return The maximum stack depth of the DAG.
    * @throws Exception exceptions through the way.
    */
-  private Integer findMaxLoopVertexStackDepth(final IRDAG dag) {
+  private Integer findMaxLoopVertexStackDepth(final DAG<IRVertex, IREdge> dag) {
     final OptionalInt maxDepth = dag.getVertices().stream().filter(dag::isCompositeVertex)
         .mapToInt(dag::getLoopStackDepthOf)
         .max();
@@ -76,7 +79,7 @@ public final class LoopExtractionPass extends ReshapingPass {
    * @return processed DAG.
    * @throws Exception exceptions through the way.
    */
-  private IRDAG groupLoops(final IRDAG dag, final Integer depth) {
+  private DAG<IRVertex, IREdge> groupLoops(final DAG<IRVertex, IREdge> dag, final Integer depth) {
     if (depth <= 0) {
       return dag;
     } else {
@@ -143,7 +146,7 @@ public final class LoopExtractionPass extends ReshapingPass {
    * @param dstVertex the destination vertex that belongs to a certain loop.
    * @param assignedLoopVertex the loop that dstVertex belongs to.
    */
-  private static void connectElementToLoop(final IRDAG dag, final DAGBuilder<IRVertex, IREdge> builder,
+  private static void connectElementToLoop(final DAG<IRVertex, IREdge> dag, final DAGBuilder<IRVertex, IREdge> builder,
                                            final IRVertex dstVertex, final LoopVertex assignedLoopVertex) {
     assignedLoopVertex.getBuilder().addVertex(dstVertex, dag);
 
@@ -178,7 +181,7 @@ public final class LoopExtractionPass extends ReshapingPass {
    * @return Processed DAG.
    * @throws Exception exceptions through the way.
    */
-  private IRDAG loopRolling(final IRDAG dag) {
+  private DAG<IRVertex, IREdge> loopRolling(final DAG<IRVertex, IREdge> dag) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
     final HashMap<LoopVertex, LoopVertex> loopVerticesOfSameLoop = new HashMap<>();
     final HashMap<LoopVertex, HashMap<IRVertex, IRVertex>> equivalentVerticesOfLoops = new HashMap<>();
@@ -275,7 +278,7 @@ public final class LoopExtractionPass extends ReshapingPass {
    * @param dag DAG to observe the incoming edges of the vertex.
    * @param loopVerticesOfSameLoop List that keeps track of the iterations of the identical loop.
    */
-  private static void addVertexToBuilder(final DAGBuilder<IRVertex, IREdge> builder, final IRDAG dag,
+  private static void addVertexToBuilder(final DAGBuilder<IRVertex, IREdge> builder, final DAG<IRVertex, IREdge> dag,
                                          final IRVertex irVertex,
                                          final Map<LoopVertex, LoopVertex> loopVerticesOfSameLoop) {
     builder.addVertex(irVertex, dag);
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
index 2e18fd9..8cae875 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/LoopOptimizations.java
@@ -240,67 +240,69 @@ public final class LoopOptimizations {
     }
 
     @Override
-    public void optimize(final IRDAG dag) {
-      final List<LoopVertex> loopVertices = new ArrayList<>();
-      final Map<LoopVertex, List<IREdge>> inEdges = new HashMap<>();
-      final Map<LoopVertex, List<IREdge>> outEdges = new HashMap<>();
-      final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
+    public void optimize(final IRDAG inputDAG) {
+      inputDAG.unSafeDirectReshaping(dag -> {
+        final List<LoopVertex> loopVertices = new ArrayList<>();
+        final Map<LoopVertex, List<IREdge>> inEdges = new HashMap<>();
+        final Map<LoopVertex, List<IREdge>> outEdges = new HashMap<>();
+        final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
 
-      collectLoopVertices(dag, loopVertices, inEdges, outEdges, builder);
+        collectLoopVertices(dag, loopVertices, inEdges, outEdges, builder);
 
-      // Refactor those with same data scan / operation, without dependencies in the loop.
-      loopVertices.forEach(loopVertex -> {
-        final List<Map.Entry<IRVertex, Set<IREdge>>> candidates = loopVertex.getNonIterativeIncomingEdges().entrySet()
+        // Refactor those with same data scan / operation, without dependencies in the loop.
+        loopVertices.forEach(loopVertex -> {
+          final List<Map.Entry<IRVertex, Set<IREdge>>> candidates = loopVertex.getNonIterativeIncomingEdges().entrySet()
             .stream().filter(entry ->
-                loopVertex.getDAG().getIncomingEdgesOf(entry.getKey()).size() == 0 // no internal inEdges
-                    // no external inEdges
-                    && loopVertex.getIterativeIncomingEdges().getOrDefault(entry.getKey(), new HashSet<>()).size() == 0)
+              loopVertex.getDAG().getIncomingEdgesOf(entry.getKey()).size() == 0 // no internal inEdges
+                // no external inEdges
+                && loopVertex.getIterativeIncomingEdges().getOrDefault(entry.getKey(), new HashSet<>()).size() == 0)
             .collect(Collectors.toList());
-        candidates.forEach(candidate -> {
-          // add refactored vertex to builder.
-          builder.addVertex(candidate.getKey());
-          // connect incoming edges.
-          candidate.getValue().forEach(builder::connectVertices);
-          // connect outgoing edges.
-          loopVertex.getDAG().getOutgoingEdgesOf(candidate.getKey()).forEach(loopVertex::addDagIncomingEdge);
-          loopVertex.getDAG().getOutgoingEdgesOf(candidate.getKey()).forEach(loopVertex::addNonIterativeIncomingEdge);
-          // modify incoming edges of loopVertex.
-          final List<IREdge> edgesToRemove = new ArrayList<>();
-          final List<IREdge> edgesToAdd = new ArrayList<>();
-          inEdges.getOrDefault(loopVertex, new ArrayList<>()).stream().filter(e ->
+          candidates.forEach(candidate -> {
+            // add refactored vertex to builder.
+            builder.addVertex(candidate.getKey());
+            // connect incoming edges.
+            candidate.getValue().forEach(builder::connectVertices);
+            // connect outgoing edges.
+            loopVertex.getDAG().getOutgoingEdgesOf(candidate.getKey()).forEach(loopVertex::addDagIncomingEdge);
+            loopVertex.getDAG().getOutgoingEdgesOf(candidate.getKey()).forEach(loopVertex::addNonIterativeIncomingEdge);
+            // modify incoming edges of loopVertex.
+            final List<IREdge> edgesToRemove = new ArrayList<>();
+            final List<IREdge> edgesToAdd = new ArrayList<>();
+            inEdges.getOrDefault(loopVertex, new ArrayList<>()).stream().filter(e ->
               // filter edges that have their sources as the refactored vertices.
               candidate.getValue().stream().map(IREdge::getSrc).anyMatch(edgeSrc -> edgeSrc.equals(e.getSrc())))
               .forEach(edge -> {
                 edgesToRemove.add(edge);
                 final IREdge newEdge = new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(),
-                    candidate.getKey(), edge.getDst());
+                  candidate.getKey(), edge.getDst());
                 newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
                 newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
                 edgesToAdd.add(newEdge);
               });
-          final List<IREdge> listToModify = inEdges.getOrDefault(loopVertex, new ArrayList<>());
-          listToModify.removeAll(edgesToRemove);
-          listToModify.addAll(edgesToAdd);
-          // clear garbage.
-          loopVertex.getBuilder().removeVertex(candidate.getKey());
-          loopVertex.getDagIncomingEdges().remove(candidate.getKey());
-          loopVertex.getNonIterativeIncomingEdges().remove(candidate.getKey());
+            final List<IREdge> listToModify = inEdges.getOrDefault(loopVertex, new ArrayList<>());
+            listToModify.removeAll(edgesToRemove);
+            listToModify.addAll(edgesToAdd);
+            // clear garbage.
+            loopVertex.getBuilder().removeVertex(candidate.getKey());
+            loopVertex.getDagIncomingEdges().remove(candidate.getKey());
+            loopVertex.getNonIterativeIncomingEdges().remove(candidate.getKey());
+          });
         });
-      });
 
-      // Add LoopVertices.
-      loopVertices.forEach(loopVertex -> {
-        builder.addVertex(loopVertex);
-        inEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(builder::connectVertices);
-        outEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(builder::connectVertices);
-      });
+        // Add LoopVertices.
+        loopVertices.forEach(loopVertex -> {
+          builder.addVertex(loopVertex);
+          inEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(builder::connectVertices);
+          outEdges.getOrDefault(loopVertex, new ArrayList<>()).forEach(builder::connectVertices);
+        });
 
-      final IRDAG newDag = builder.build();
-      if (dag.getVertices().size() == newDag.getVertices().size()) {
-        return newDag;
-      } else {
-        return apply(newDag);
-      }
+        final IRDAG newDag = builder.build();
+        if (dag.getVertices().size() == newDag.getVertices().size()) {
+          return newDag;
+        } else {
+          return apply(newDag);
+        }
+      });
     }
   }
 }