You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@nemo.apache.org by GitBox <gi...@apache.org> on 2020/06/25 06:44:19 UTC

[GitHub] [incubator-nemo] johnyangk commented on a change in pull request #293: [NEMO-436] (WIP) Dynamic re-configuration based on the Sampled Metric Data

johnyangk commented on a change in pull request #293:
URL: https://github.com/apache/incubator-nemo/pull/293#discussion_r445329368



##########
File path: runtime/common/src/main/proto/ControlMessage.proto
##########
@@ -146,14 +146,20 @@ message BlockStateChangedMsg {
 
 message RunTimePassMessageMsg {
     required string taskId = 1;
-    repeated RunTimePassMessageEntry entry = 2;
+    required RunTimePassType runTimePassType = 2;
+    repeated RunTimePassMessageEntry entry = 3;
 }
 
 message RunTimePassMessageEntry {
     required string key = 1;
     required int64 value = 2;
 }
 
+enum RunTimePassType {

Review comment:
       Please remove

##########
File path: runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchSchedulerUtils.java
##########
@@ -253,9 +253,15 @@ static void onTaskExecutionFailedRecoverable(final PlanStateManager planStateMan
    * @param data             of the message.
    */
   public static void onRunTimePassMessage(final PlanStateManager planStateManager, final PlanRewriter planRewriter,
+                                          final ControlMessage.RunTimePassType runTimePassType,
                                           final String taskId, final Object data) {
     final Set<StageEdge> targetEdges = BatchSchedulerUtils.getEdgesToOptimize(planStateManager, taskId);
-    planRewriter.accumulate(BatchSchedulerUtils.getMessageId(targetEdges), data);
+    if (runTimePassType.equals(ControlMessage.RunTimePassType.DynamicTaskSizingPass)) {

Review comment:
       Please remove

##########
File path: compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoPlanRewriter.java
##########
@@ -101,34 +130,45 @@ public PhysicalPlan rewrite(final PhysicalPlan currentPhysicalPlan, final int me
     // Optimize using the Message
     final Message message = new Message(messageId, examiningEdges, aggregatedData);
     final IRDAG newIRDAG = nemoOptimizer.optimizeAtRunTime(currentIRDAG, message);
+    this.setCurrentIRDAG(newIRDAG);
 
     // Re-compile the IRDAG into a physical plan
     final PhysicalPlan newPhysicalPlan = nemoBackend.compile(newIRDAG);
 
     // Update the physical plan and return
     final List<Stage> currentStages = currentPhysicalPlan.getStageDAG().getTopologicalSort();
     final List<Stage> newStages = newPhysicalPlan.getStageDAG().getTopologicalSort();
-    for (int i = 0; i < currentStages.size(); i++) {
+    IntStream.range(0, currentStages.size()).forEachOrdered(i -> {
       final ExecutionPropertyMap<VertexExecutionProperty> newProperties = newStages.get(i).getExecutionProperties();
       currentStages.get(i).setExecutionProperties(newProperties);
-    }
+      newProperties.get(ParallelismProperty.class).ifPresent(newParallelism -> {
+        currentStages.get(i).getTaskIndices().clear();
+        currentStages.get(i).getTaskIndices().addAll(IntStream.range(0, newParallelism).boxed()
+          .collect(Collectors.toList()));
+        IntStream.range(currentStages.get(i).getVertexIdToReadables().size(), newParallelism).forEach(newIdx ->
+          currentStages.get(i).getVertexIdToReadables().add(new HashMap<>()));
+      });
+    });
     return currentPhysicalPlan;
   }
 
   @Override
-  public void accumulate(final int messageId, final Object data) {
+  public void accumulate(final int messageId, final ControlMessage.RunTimePassType runTimePassType, final Object data) {
+    final Prophet prophet;
+    switch (runTimePassType) {
+      case DataSkewPass:
+        prophet = new SkewProphet((List<ControlMessage.RunTimePassMessageEntry>) data);

Review comment:
       Does this mean that an optimization pass writer must also write a specific prophet implementation (which I believe is part of the compiler backend, not IR)? Maybe move the prophet class into the IR package?
   

##########
File path: compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/NemoPlanRewriter.java
##########
@@ -101,34 +130,45 @@ public PhysicalPlan rewrite(final PhysicalPlan currentPhysicalPlan, final int me
     // Optimize using the Message
     final Message message = new Message(messageId, examiningEdges, aggregatedData);
     final IRDAG newIRDAG = nemoOptimizer.optimizeAtRunTime(currentIRDAG, message);
+    this.setCurrentIRDAG(newIRDAG);
 
     // Re-compile the IRDAG into a physical plan
     final PhysicalPlan newPhysicalPlan = nemoBackend.compile(newIRDAG);
 
     // Update the physical plan and return
     final List<Stage> currentStages = currentPhysicalPlan.getStageDAG().getTopologicalSort();
     final List<Stage> newStages = newPhysicalPlan.getStageDAG().getTopologicalSort();
-    for (int i = 0; i < currentStages.size(); i++) {
+    IntStream.range(0, currentStages.size()).forEachOrdered(i -> {
       final ExecutionPropertyMap<VertexExecutionProperty> newProperties = newStages.get(i).getExecutionProperties();
       currentStages.get(i).setExecutionProperties(newProperties);
-    }
+      newProperties.get(ParallelismProperty.class).ifPresent(newParallelism -> {
+        currentStages.get(i).getTaskIndices().clear();
+        currentStages.get(i).getTaskIndices().addAll(IntStream.range(0, newParallelism).boxed()
+          .collect(Collectors.toList()));
+        IntStream.range(currentStages.get(i).getVertexIdToReadables().size(), newParallelism).forEach(newIdx ->
+          currentStages.get(i).getVertexIdToReadables().add(new HashMap<>()));
+      });
+    });
     return currentPhysicalPlan;
   }
 
   @Override
-  public void accumulate(final int messageId, final Object data) {
+  public void accumulate(final int messageId, final ControlMessage.RunTimePassType runTimePassType, final Object data) {
+    final Prophet prophet;
+    switch (runTimePassType) {

Review comment:
       Please remove this switch statement (same reason as above)

##########
File path: runtime/common/src/main/proto/ControlMessage.proto
##########
@@ -146,14 +146,20 @@ message BlockStateChangedMsg {
 
 message RunTimePassMessageMsg {
     required string taskId = 1;
-    repeated RunTimePassMessageEntry entry = 2;
+    required RunTimePassType runTimePassType = 2;

Review comment:
       Please remove (same reason)

##########
File path: runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java
##########
@@ -46,36 +46,46 @@
   private final IRVertex irVertex;
   private final PersistentConnectionToMasterMap connectionToMasterMap;
   private final TaskExecutor taskExecutor;
+  private final ControlMessage.RunTimePassType runTimePassType;
 
   public RunTimeMessageOutputCollector(final String taskId,
                                        final IRVertex irVertex,
                                        final PersistentConnectionToMasterMap connectionToMasterMap,
-                                       final TaskExecutor taskExecutor) {
+                                       final TaskExecutor taskExecutor,
+                                       final ControlMessage.RunTimePassType runTimePassType) {
     this.taskId = taskId;
     this.irVertex = irVertex;
     this.connectionToMasterMap = connectionToMasterMap;
     this.taskExecutor = taskExecutor;
+    this.runTimePassType = runTimePassType;
   }
 
   @Override
   public void emit(final O output) {
-    final Map<Object, Long> aggregatedMessage = (Map<Object, Long>) output;
     final List<ControlMessage.RunTimePassMessageEntry> entries = new ArrayList<>();
-    aggregatedMessage.forEach((key, size) ->
-      entries.add(
-        ControlMessage.RunTimePassMessageEntry.newBuilder()
-          // TODO #325: Add (de)serialization for non-string key types in data metric collection
-          .setKey(key == null ? NULL_KEY : String.valueOf(key))
-          .setValue(size)
-          .build())
-    );
-
+    switch (runTimePassType) {

Review comment:
       Can you remove this switch?
   It'd be better to keep the runtime agnostic to different types of runtime passes.
   This switch seems to suggest that one needs to modify the runtime for every new run-time optimization pass, which is probably not what we want.

##########
File path: compiler/backend/src/main/java/org/apache/nemo/compiler/backend/nemo/prophet/Prophet.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.backend.nemo.prophet;
+
+import java.util.Map;
+
+/**
+ * A prophet class for dynamic optimization.

Review comment:
       Can you provide a more detailed description of this class?

##########
File path: compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SamplingTaskSizingPass.java
##########
@@ -0,0 +1,431 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping;
+
+import org.apache.nemo.common.Util;
+import org.apache.nemo.common.dag.Edge;
+import org.apache.nemo.common.ir.IRDAG;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.*;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.EnableDynamicTaskSizingProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.common.ir.vertex.utility.TaskSizeSplitterVertex;
+import org.apache.nemo.common.ir.vertex.utility.runtimepasstriggervertex.SignalVertex;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.Annotates;
+import org.apache.nemo.runtime.common.plan.StagePartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+/**
+ * Compiler pass for dynamic task size optimization. Happens only when the edge property is SHUFFLE.
+ * If (size of given job) >= 1GB: enable dynamic task sizing optimization.
+ * else:                          break.
+ *
+ *
+ * @Attributes
+ * PARTITIONER_PROPERTY_FOR_SMALL_JOB:  PartitionerProperty for jobs in range of [1GB, 10GB) size.
+ * PARTITIONER_PROPERTY_FOR_MEDIUM_JOB: PartitionerProperty for jobs in range of [10GB, 100GB) size.
+ * PARTITIONER_PROPERTY_FOR_BIG_JOB:    PartitionerProperty for jobs in range of [100GB, - ) size(No upper limit).
+ *
+ * source stage - shuffle edge - current stage - next stage
+ * -> source stage - [curr stage - signal vertex] - next stage
+ * where [] is a splitter vertex
+ */
+@Annotates({EnableDynamicTaskSizingProperty.class, PartitionerProperty.class, SubPartitionSetProperty.class,
+  ParallelismProperty.class})
+public final class SamplingTaskSizingPass extends ReshapingPass {

Review comment:
       Please provide a test for this pass
   
   Ref:
   https://github.com/apache/incubator-nemo/blob/master/examples/beam/src/test/java/org/apache/nemo/examples/beam/PerKeyMedianITCase.java#L72

##########
File path: runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulationScheduler.java
##########
@@ -559,6 +560,7 @@ public void send(final ControlMessage.Message message) {
         case RunTimePassMessage:
           scheduler.onRunTimePassMessage(
             // TODO #436: Dynamic task resizing.
+            message.getRunTimePassMessageMsg().getRunTimePassType(),

Review comment:
       Please remove

##########
File path: common/src/main/java/org/apache/nemo/common/ir/vertex/utility/TaskSizeSplitterVertex.java
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.ir.vertex.utility;
+
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.Util;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.MessageIdEdgeProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.LoopVertex;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.common.ir.vertex.transform.SignalTransform;
+import org.apache.nemo.common.ir.vertex.utility.runtimepasstriggervertex.SignalVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * This vertex works as a partition-based sampling vertex of dynamic task sizing pass.
+ * It covers both sampling vertices and optimized vertices known from sampling by iterating same vertices, giving
+ * different properties in each iteration.
+ */
+public final class TaskSizeSplitterVertex extends LoopVertex {
+  // Information about original(before splitting) vertices
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSizeSplitterVertex.class.getName());
+  private final Set<IRVertex> originalVertices;
+  // Vertex which has incoming edge from other stages. Guaranteed to be only one in each stage by stage partitioner
+  private final Set<IRVertex> firstVerticesInStage;
+  // vertices which has outgoing edge to other stages. Can be more than one in one stage
+  private final Set<IRVertex> verticesWithStageOutgoingEdges;
+  // vertices which does not have any outgoing edge to vertices in same stage
+  private final Set<IRVertex> lastVerticesInStage;
+
+  // Information about partition sizes
+  private final int partitionerProperty;
+
+  // Information about splitter vertex's iteration
+  private int testingTrial;
+
+  private final Map<IRVertex, IRVertex> mapOfOriginalVertexToClone = new HashMap<>();
+
+  /**
+   * Default constructor of TaskSizeSplitterVertex.
+   * @param splitterVertexName              for now, this doesn't do anything. This is inserted to enable extension
+   *                                        from LoopVertex.
+   * @param originalVertices                Set of vertices which form one stage and which splitter will wrap up.
+   * @param firstVerticesInStage            The first vertex in stage. Although it is given as a Set, we assert that
+   *                                        this set has only one element (guaranteed by stage partitioner logic)
+   * @param verticesWithStageOutgoingEdges  Vertices which has outgoing edges to other stage.
+   * @param lastVerticesInStage             Vertices which has only outgoing edges to other stage.
+   * @param partitionerProperty             PartitionerProperty of incoming stage edge regarding to job data size.
+   *                                        For more information, check
+   */
+  public TaskSizeSplitterVertex(final String splitterVertexName,
+                                final Set<IRVertex> originalVertices,
+                                final Set<IRVertex> firstVerticesInStage,
+                                final Set<IRVertex> verticesWithStageOutgoingEdges,
+                                final Set<IRVertex> lastVerticesInStage,
+                                final int partitionerProperty) {
+    super(splitterVertexName); // need to take care of here
+    this.testingTrial = 0;
+    this.originalVertices = originalVertices;
+    this.partitionerProperty = partitionerProperty;
+    for (IRVertex original : originalVertices) {
+      mapOfOriginalVertexToClone.putIfAbsent(original, original.getClone());
+    }
+    this.firstVerticesInStage = firstVerticesInStage;
+    this.verticesWithStageOutgoingEdges = verticesWithStageOutgoingEdges;
+    this.lastVerticesInStage = lastVerticesInStage;
+  }
+
+  // Getters of attributes
+  public Set<IRVertex> getOriginalVertices() {
+    return originalVertices;
+  }
+
+  public Set<IRVertex> getFirstVerticesInStage() {
+    return firstVerticesInStage;
+  }
+
+  public Set<IRVertex> getVerticesWithStageOutgoingEdges() {
+    return verticesWithStageOutgoingEdges;
+  }
+
+  /**
+   * Insert vertices from original dag. This does not harm their topological order.
+   * @param stageVertices   vertices to insert. can be same as OriginalVertices.
+   * @param edgesInBetween  edges connecting stageVertices. This stage does not contain any edge
+   *                        that are connected to vertices other than those in stageVertices.
+   *                        (Both ends need to be the element of stageVertices)
+   */
+  public void insertWorkingVertices(final Set<IRVertex> stageVertices, final Set<IREdge> edgesInBetween) {
+    stageVertices.forEach(vertex -> getBuilder().addVertex(vertex));
+    edgesInBetween.forEach(edge -> getBuilder().connectVertices(edge));
+  }
+
+  /**
+   * Inserts signal Vertex at the end of the iteration. Last iteration does not contain any signal vertex.
+   * (stage finishing vertices) - dummyShuffleEdge - SignalVertex
+   * SignalVertex - ControlEdge - (stage starting vertices)
+   * @param toInsert              SignalVertex to insert.
+   */
+  public void insertSignalVertex(final SignalVertex toInsert) {
+    getBuilder().addVertex(toInsert);
+    for (IRVertex lastVertex : lastVerticesInStage) {
+      IREdge edgeToSignal = Util.createControlEdge(lastVertex, toInsert);
+      getBuilder().connectVertices(edgeToSignal);
+      for (IRVertex firstVertex : firstVerticesInStage) {
+        IREdge controlEdgeToBeginning = Util.createControlEdge(toInsert, firstVertex);
+        addIterativeIncomingEdge(controlEdgeToBeginning);
+      }
+    }
+  }
+
+  public void increaseTestingTrial() {
+    testingTrial++;
+  }
+
+  /**
+   * Need to be careful about Signal Vertex, because they do not appear in the last iteration.
+   * @param dagBuilder DAGBuilder to add the unrolled iteration to.
+   * @return Modified this object
+   */
+  public TaskSizeSplitterVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder) {
+    final HashMap<IRVertex, IRVertex> originalToNewIRVertex = new HashMap<>();
+    final HashSet<IRVertex> originalUtilityVertices = new HashSet<>();
+    final HashSet<IREdge> edgesToOptimize = new HashSet<>();
+
+    final List<OperatorVertex> previousSignalVertex = new ArrayList<>(1);
+    final DAG<IRVertex, IREdge> dagToAdd = getDAG();
+
+    decreaseMaxNumberOfIterations();
+
+    // add the working vertex and its incoming edges to the dagBuilder.
+    dagToAdd.topologicalDo(irVertex -> {
+      if (!(irVertex instanceof SignalVertex)) {
+        final IRVertex newIrVertex = irVertex.getClone();
+        setParallelismPropertyByTestingTrial(newIrVertex);
+        originalToNewIRVertex.putIfAbsent(irVertex, newIrVertex);
+        dagBuilder.addVertex(newIrVertex, dagToAdd);
+        dagToAdd.getIncomingEdgesOf(irVertex).forEach(edge -> {
+          final IRVertex newSrc = originalToNewIRVertex.get(edge.getSrc());
+          final IREdge newIrEdge =
+            new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), newSrc, newIrVertex);
+          edge.copyExecutionPropertiesTo(newIrEdge);
+          setSubPartitionSetPropertyByTestingTrial(newIrEdge);
+          edgesToOptimize.add(newIrEdge);
+          dagBuilder.connectVertices(newIrEdge);
+        });
+      } else {
+        originalUtilityVertices.add(irVertex);
+      }
+    });
+
+    // process the initial DAG incoming edges for the first loop.
+    getDagIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
+      final IREdge newIrEdge = new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(),
+        edge.getSrc(), originalToNewIRVertex.get(dstVertex));
+      edge.copyExecutionPropertiesTo(newIrEdge);
+      setSubPartitionSetPropertyByTestingTrial(newIrEdge);
+      if (edge.getSrc() instanceof OperatorVertex
+        && ((OperatorVertex) edge.getSrc()).getTransform() instanceof SignalTransform) {
+        previousSignalVertex.add((OperatorVertex) edge.getSrc());
+      } else {
+        edgesToOptimize.add(newIrEdge);
+      }
+      dagBuilder.connectVertices(newIrEdge);
+    }));
+
+    getDagOutgoingEdges().forEach((srcVertex, irEdges) -> irEdges.forEach(edgeFromOriginal -> {
+      for (Map.Entry<IREdge, IREdge> entry : this.getEdgeWithInternalVertexToEdgeWithLoop().entrySet()) {
+        if (entry.getKey().getId().equals(edgeFromOriginal.getId())) {
+          final IREdge correspondingEdge = entry.getValue(); // edge to next splitter vertex
+          if (correspondingEdge.getDst() instanceof TaskSizeSplitterVertex) {
+            TaskSizeSplitterVertex nextSplitter = (TaskSizeSplitterVertex) correspondingEdge.getDst();
+            IRVertex dstVertex = edgeFromOriginal.getDst(); // vertex inside of next splitter vertex
+            List<IREdge> edgesToDelete = new ArrayList<>();
+            List<IREdge> edgesToAdd = new ArrayList<>();
+            for (IREdge edgeToDst : nextSplitter.getDagIncomingEdges().get(dstVertex)) {
+              if (edgeToDst.getSrc().getId().equals(srcVertex.getId())) {
+                final IREdge newIrEdge = new IREdge(
+                  edgeFromOriginal.getPropertyValue(CommunicationPatternProperty.class).get(),
+                  originalToNewIRVertex.get(srcVertex),
+                  edgeFromOriginal.getDst());
+                edgeToDst.copyExecutionPropertiesTo(newIrEdge);
+                edgesToDelete.add(edgeToDst);
+                edgesToAdd.add(newIrEdge);
+                final IREdge newLoopEdge = Util.cloneEdge(
+                  correspondingEdge, newIrEdge.getSrc(), correspondingEdge.getDst());
+                nextSplitter.mapEdgeWithLoop(newLoopEdge, newIrEdge);
+              }
+            }
+            if (loopTerminationConditionMet()) {
+              for (IREdge edgeToDelete : edgesToDelete) {
+                nextSplitter.removeDagIncomingEdge(edgeToDelete);
+                nextSplitter.removeNonIterativeIncomingEdge(edgeToDelete);
+              }
+            }
+            for (IREdge edgeToAdd : edgesToAdd) {
+              nextSplitter.addDagIncomingEdge(edgeToAdd);
+              nextSplitter.addNonIterativeIncomingEdge(edgeToAdd);
+            }
+          } else {
+            final IREdge newIrEdge = new IREdge(
+              edgeFromOriginal.getPropertyValue(CommunicationPatternProperty.class).get(),
+              originalToNewIRVertex.get(srcVertex), edgeFromOriginal.getDst());
+            edgeFromOriginal.copyExecutionPropertiesTo(newIrEdge);
+            dagBuilder.addVertex(edgeFromOriginal.getDst()).connectVertices(newIrEdge);
+          }
+        }
+      }
+    }));
+
+    // if loop termination condition is false, add signal vertex
+    if (!loopTerminationConditionMet()) {
+      for (IRVertex helper : originalUtilityVertices) {
+        final IRVertex newHelper = helper.getClone();
+        originalToNewIRVertex.putIfAbsent(helper, newHelper);
+        setParallelismPropertyByTestingTrial(newHelper);
+        dagBuilder.addVertex(newHelper, dagToAdd);
+        dagToAdd.getIncomingEdgesOf(helper).forEach(edge -> {
+          final IRVertex newSrc = originalToNewIRVertex.get(edge.getSrc());
+          final IREdge newIrEdge =
+            new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), newSrc, newHelper);
+          edge.copyExecutionPropertiesTo(newIrEdge);
+          dagBuilder.connectVertices(newIrEdge);
+        });
+      }
+    }
+
+    // assign signal vertex of n-th iteration with nonIterativeIncomingEdges of (n+1)th iteration
+    markEdgesToOptimize(previousSignalVertex, edgesToOptimize);
+
+    // process next iteration's DAG incoming edges, and add them as the next loop's incoming edges:
+    // clear, as we're done with the current loop and need to prepare it for the next one.
+    this.getDagIncomingEdges().clear();
+    this.getNonIterativeIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(this::addDagIncomingEdge));
+    if (!loopTerminationConditionMet()) {
+      this.getIterativeIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
+        final IREdge newIrEdge = new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(),
+          originalToNewIRVertex.get(edge.getSrc()), dstVertex);
+        edge.copyExecutionPropertiesTo(newIrEdge);
+        this.addDagIncomingEdge(newIrEdge);
+      }));
+    }
+
+    increaseTestingTrial();
+    return this;
+  }
+
+  // private helper methods
+
+  /**
+   * Set Parallelism Property of internal vertices by unroll iteration.
+   * @param irVertex    vertex to set parallelism property.
+   */
+  private void setParallelismPropertyByTestingTrial(final IRVertex irVertex) {
+    if (testingTrial == 0 && !(irVertex instanceof OperatorVertex
+      && ((OperatorVertex) irVertex).getTransform() instanceof SignalTransform)) {
+      irVertex.setPropertyPermanently(ParallelismProperty.of(32));
+    } else {
+      irVertex.setProperty(ParallelismProperty.of(1));
+    }
+  }
+
+  /**
+   * Set SubPartitionSetProperty of given edge by unroll iteration.
+   * @param edge    edge to set subPartitionSetProperty
+   */
+  private void setSubPartitionSetPropertyByTestingTrial(final IREdge edge) {
+    final ArrayList<KeyRange> partitionSet = new ArrayList<>();
+    int taskIndex = 0;
+    if (testingTrial == 0) {
+      for (int i = 0; i < 4; i++) {
+        partitionSet.add(taskIndex, HashRange.of(i, i + 1));
+        taskIndex++;
+      }
+      for (int groupStartingIndex = 4; groupStartingIndex < 512; groupStartingIndex *= 2) {
+        int growingFactor = groupStartingIndex / 4;
+        for (int startIndex = groupStartingIndex; startIndex < groupStartingIndex * 2; startIndex += growingFactor) {
+          partitionSet.add(taskIndex, HashRange.of(startIndex, startIndex + growingFactor));
+          taskIndex++;
+        }
+      }
+      edge.setProperty(SubPartitionSetProperty.of(partitionSet));
+    } else {
+      partitionSet.add(0, HashRange.of(512, partitionerProperty)); // 31+testingTrial
+      edge.setProperty(SubPartitionSetProperty.of(partitionSet));
+    }
+  }
+
+  /**
+   * Mark edges for DTS (i.e. incoming edges of second iteration vertices).
+   * @param toAssign          Signal Vertex to get MessageIdVertexProperty
+   * @param edgesToOptimize   Edges to mark for DTS
+   */
+  private void markEdgesToOptimize(final List<OperatorVertex> toAssign, final Set<IREdge> edgesToOptimize) {
+    if (testingTrial > 0) {
+      edgesToOptimize.forEach(edge -> {
+        if (!edge.getDst().getPropertyValue(ParallelismProperty.class).get().equals(1)) {
+          throw new IllegalArgumentException("Target edges should begin with Parallelism of 1.");
+        }
+        final HashSet<Integer> msgEdgeIds =
+          edge.getPropertyValue(MessageIdEdgeProperty.class).orElse(new HashSet<>(0));
+        msgEdgeIds.add(toAssign.get(0).getPropertyValue(MessageIdVertexProperty.class).get());
+        edge.setProperty(MessageIdEdgeProperty.of(msgEdgeIds));
+      });
+    }
+  }
+
+  public void printLogs() {
+    LOG.error("[Vertex] this is splitter {}", this.getId());

Review comment:
       error -> info ?

##########
File path: common/src/main/java/org/apache/nemo/common/ir/vertex/utility/TaskSizeSplitterVertex.java
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.ir.vertex.utility;
+
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.Util;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.MessageIdEdgeProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.LoopVertex;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.common.ir.vertex.transform.SignalTransform;
+import org.apache.nemo.common.ir.vertex.utility.runtimepasstriggervertex.SignalVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * This vertex works as a partition-based sampling vertex of dynamic task sizing pass.
+ * It covers both sampling vertices and optimized vertices known from sampling by iterating same vertices, giving
+ * different properties in each iteration.
+ */
+public final class TaskSizeSplitterVertex extends LoopVertex {
+  // Information about original(before splitting) vertices
+  private static final Logger LOG = LoggerFactory.getLogger(TaskSizeSplitterVertex.class.getName());
+  private final Set<IRVertex> originalVertices;
+  // Vertex which has incoming edge from other stages. Guaranteed to be only one in each stage by stage partitioner
+  private final Set<IRVertex> firstVerticesInStage;
+  // vertices which has outgoing edge to other stages. Can be more than one in one stage
+  private final Set<IRVertex> verticesWithStageOutgoingEdges;
+  // vertices which does not have any outgoing edge to vertices in same stage
+  private final Set<IRVertex> lastVerticesInStage;
+
+  // Information about partition sizes
+  private final int partitionerProperty;
+
+  // Information about splitter vertex's iteration
+  private int testingTrial;
+
+  private final Map<IRVertex, IRVertex> mapOfOriginalVertexToClone = new HashMap<>();
+
+  /**
+   * Default constructor of TaskSizeSplitterVertex.
+   * @param splitterVertexName              for now, this doesn't do anything. This is inserted to enable extension
+   *                                        from LoopVertex.
+   * @param originalVertices                Set of vertices which form one stage and which splitter will wrap up.
+   * @param firstVerticesInStage            The first vertex in stage. Although it is given as a Set, we assert that
+   *                                        this set has only one element (guaranteed by stage partitioner logic)
+   * @param verticesWithStageOutgoingEdges  Vertices which has outgoing edges to other stage.
+   * @param lastVerticesInStage             Vertices which has only outgoing edges to other stage.
+   * @param partitionerProperty             PartitionerProperty of incoming stage edge regarding to job data size.
+   *                                        For more information, check
+   */
+  public TaskSizeSplitterVertex(final String splitterVertexName,
+                                final Set<IRVertex> originalVertices,
+                                final Set<IRVertex> firstVerticesInStage,
+                                final Set<IRVertex> verticesWithStageOutgoingEdges,
+                                final Set<IRVertex> lastVerticesInStage,
+                                final int partitionerProperty) {
+    super(splitterVertexName); // need to take care of here
+    this.testingTrial = 0;
+    this.originalVertices = originalVertices;
+    this.partitionerProperty = partitionerProperty;
+    for (IRVertex original : originalVertices) {
+      mapOfOriginalVertexToClone.putIfAbsent(original, original.getClone());
+    }
+    this.firstVerticesInStage = firstVerticesInStage;
+    this.verticesWithStageOutgoingEdges = verticesWithStageOutgoingEdges;
+    this.lastVerticesInStage = lastVerticesInStage;
+  }
+
+  // Getters of attributes
+  public Set<IRVertex> getOriginalVertices() {
+    return originalVertices;
+  }
+
+  public Set<IRVertex> getFirstVerticesInStage() {
+    return firstVerticesInStage;
+  }
+
+  public Set<IRVertex> getVerticesWithStageOutgoingEdges() {
+    return verticesWithStageOutgoingEdges;
+  }
+
+  /**
+   * Insert vertices from original dag. This does not harm their topological order.
+   * @param stageVertices   vertices to insert. can be same as OriginalVertices.
+   * @param edgesInBetween  edges connecting stageVertices. This stage does not contain any edge
+   *                        that are connected to vertices other than those in stageVertices.
+   *                        (Both ends need to be the element of stageVertices)
+   */
+  public void insertWorkingVertices(final Set<IRVertex> stageVertices, final Set<IREdge> edgesInBetween) {
+    stageVertices.forEach(vertex -> getBuilder().addVertex(vertex));
+    edgesInBetween.forEach(edge -> getBuilder().connectVertices(edge));
+  }
+
+  /**
+   * Inserts signal Vertex at the end of the iteration. Last iteration does not contain any signal vertex.
+   * (stage finishing vertices) - dummyShuffleEdge - SignalVertex
+   * SignalVertex - ControlEdge - (stage starting vertices)
+   * @param toInsert              SignalVertex to insert.
+   */
+  public void insertSignalVertex(final SignalVertex toInsert) {
+    getBuilder().addVertex(toInsert);
+    for (IRVertex lastVertex : lastVerticesInStage) {
+      IREdge edgeToSignal = Util.createControlEdge(lastVertex, toInsert);
+      getBuilder().connectVertices(edgeToSignal);
+      for (IRVertex firstVertex : firstVerticesInStage) {
+        IREdge controlEdgeToBeginning = Util.createControlEdge(toInsert, firstVertex);
+        addIterativeIncomingEdge(controlEdgeToBeginning);
+      }
+    }
+  }
+
+  public void increaseTestingTrial() {
+    testingTrial++;
+  }
+
+  /**
+   * Need to be careful about Signal Vertex, because they do not appear in the last iteration.
+   * @param dagBuilder DAGBuilder to add the unrolled iteration to.
+   * @return Modified this object
+   */
+  public TaskSizeSplitterVertex unRollIteration(final DAGBuilder<IRVertex, IREdge> dagBuilder) {
+    final HashMap<IRVertex, IRVertex> originalToNewIRVertex = new HashMap<>();
+    final HashSet<IRVertex> originalUtilityVertices = new HashSet<>();
+    final HashSet<IREdge> edgesToOptimize = new HashSet<>();
+
+    final List<OperatorVertex> previousSignalVertex = new ArrayList<>(1);
+    final DAG<IRVertex, IREdge> dagToAdd = getDAG();
+
+    decreaseMaxNumberOfIterations();
+
+    // add the working vertex and its incoming edges to the dagBuilder.
+    dagToAdd.topologicalDo(irVertex -> {
+      if (!(irVertex instanceof SignalVertex)) {
+        final IRVertex newIrVertex = irVertex.getClone();
+        setParallelismPropertyByTestingTrial(newIrVertex);
+        originalToNewIRVertex.putIfAbsent(irVertex, newIrVertex);
+        dagBuilder.addVertex(newIrVertex, dagToAdd);
+        dagToAdd.getIncomingEdgesOf(irVertex).forEach(edge -> {
+          final IRVertex newSrc = originalToNewIRVertex.get(edge.getSrc());
+          final IREdge newIrEdge =
+            new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), newSrc, newIrVertex);
+          edge.copyExecutionPropertiesTo(newIrEdge);
+          setSubPartitionSetPropertyByTestingTrial(newIrEdge);
+          edgesToOptimize.add(newIrEdge);
+          dagBuilder.connectVertices(newIrEdge);
+        });
+      } else {
+        originalUtilityVertices.add(irVertex);
+      }
+    });
+
+    // process the initial DAG incoming edges for the first loop.
+    getDagIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
+      final IREdge newIrEdge = new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(),
+        edge.getSrc(), originalToNewIRVertex.get(dstVertex));
+      edge.copyExecutionPropertiesTo(newIrEdge);
+      setSubPartitionSetPropertyByTestingTrial(newIrEdge);
+      if (edge.getSrc() instanceof OperatorVertex
+        && ((OperatorVertex) edge.getSrc()).getTransform() instanceof SignalTransform) {
+        previousSignalVertex.add((OperatorVertex) edge.getSrc());
+      } else {
+        edgesToOptimize.add(newIrEdge);
+      }
+      dagBuilder.connectVertices(newIrEdge);
+    }));
+
+    getDagOutgoingEdges().forEach((srcVertex, irEdges) -> irEdges.forEach(edgeFromOriginal -> {
+      for (Map.Entry<IREdge, IREdge> entry : this.getEdgeWithInternalVertexToEdgeWithLoop().entrySet()) {
+        if (entry.getKey().getId().equals(edgeFromOriginal.getId())) {
+          final IREdge correspondingEdge = entry.getValue(); // edge to next splitter vertex
+          if (correspondingEdge.getDst() instanceof TaskSizeSplitterVertex) {
+            TaskSizeSplitterVertex nextSplitter = (TaskSizeSplitterVertex) correspondingEdge.getDst();
+            IRVertex dstVertex = edgeFromOriginal.getDst(); // vertex inside of next splitter vertex
+            List<IREdge> edgesToDelete = new ArrayList<>();
+            List<IREdge> edgesToAdd = new ArrayList<>();
+            for (IREdge edgeToDst : nextSplitter.getDagIncomingEdges().get(dstVertex)) {
+              if (edgeToDst.getSrc().getId().equals(srcVertex.getId())) {
+                final IREdge newIrEdge = new IREdge(
+                  edgeFromOriginal.getPropertyValue(CommunicationPatternProperty.class).get(),
+                  originalToNewIRVertex.get(srcVertex),
+                  edgeFromOriginal.getDst());
+                edgeToDst.copyExecutionPropertiesTo(newIrEdge);
+                edgesToDelete.add(edgeToDst);
+                edgesToAdd.add(newIrEdge);
+                final IREdge newLoopEdge = Util.cloneEdge(
+                  correspondingEdge, newIrEdge.getSrc(), correspondingEdge.getDst());
+                nextSplitter.mapEdgeWithLoop(newLoopEdge, newIrEdge);
+              }
+            }
+            if (loopTerminationConditionMet()) {
+              for (IREdge edgeToDelete : edgesToDelete) {
+                nextSplitter.removeDagIncomingEdge(edgeToDelete);
+                nextSplitter.removeNonIterativeIncomingEdge(edgeToDelete);
+              }
+            }
+            for (IREdge edgeToAdd : edgesToAdd) {
+              nextSplitter.addDagIncomingEdge(edgeToAdd);
+              nextSplitter.addNonIterativeIncomingEdge(edgeToAdd);
+            }
+          } else {
+            final IREdge newIrEdge = new IREdge(
+              edgeFromOriginal.getPropertyValue(CommunicationPatternProperty.class).get(),
+              originalToNewIRVertex.get(srcVertex), edgeFromOriginal.getDst());
+            edgeFromOriginal.copyExecutionPropertiesTo(newIrEdge);
+            dagBuilder.addVertex(edgeFromOriginal.getDst()).connectVertices(newIrEdge);
+          }
+        }
+      }
+    }));
+
+    // if loop termination condition is false, add signal vertex
+    if (!loopTerminationConditionMet()) {
+      for (IRVertex helper : originalUtilityVertices) {
+        final IRVertex newHelper = helper.getClone();
+        originalToNewIRVertex.putIfAbsent(helper, newHelper);
+        setParallelismPropertyByTestingTrial(newHelper);
+        dagBuilder.addVertex(newHelper, dagToAdd);
+        dagToAdd.getIncomingEdgesOf(helper).forEach(edge -> {
+          final IRVertex newSrc = originalToNewIRVertex.get(edge.getSrc());
+          final IREdge newIrEdge =
+            new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), newSrc, newHelper);
+          edge.copyExecutionPropertiesTo(newIrEdge);
+          dagBuilder.connectVertices(newIrEdge);
+        });
+      }
+    }
+
+    // assign signal vertex of n-th iteration with nonIterativeIncomingEdges of (n+1)th iteration
+    markEdgesToOptimize(previousSignalVertex, edgesToOptimize);
+
+    // process next iteration's DAG incoming edges, and add them as the next loop's incoming edges:
+    // clear, as we're done with the current loop and need to prepare it for the next one.
+    this.getDagIncomingEdges().clear();
+    this.getNonIterativeIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(this::addDagIncomingEdge));
+    if (!loopTerminationConditionMet()) {
+      this.getIterativeIncomingEdges().forEach((dstVertex, irEdges) -> irEdges.forEach(edge -> {
+        final IREdge newIrEdge = new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(),
+          originalToNewIRVertex.get(edge.getSrc()), dstVertex);
+        edge.copyExecutionPropertiesTo(newIrEdge);
+        this.addDagIncomingEdge(newIrEdge);
+      }));
+    }
+
+    increaseTestingTrial();
+    return this;
+  }
+
+  // private helper methods
+
+  /**
+   * Set Parallelism Property of internal vertices by unroll iteration.
+   * @param irVertex    vertex to set parallelism property.
+   */
+  private void setParallelismPropertyByTestingTrial(final IRVertex irVertex) {
+    if (testingTrial == 0 && !(irVertex instanceof OperatorVertex
+      && ((OperatorVertex) irVertex).getTransform() instanceof SignalTransform)) {
+      irVertex.setPropertyPermanently(ParallelismProperty.of(32));
+    } else {
+      irVertex.setProperty(ParallelismProperty.of(1));
+    }
+  }
+
+  /**
+   * Set SubPartitionSetProperty of given edge by unroll iteration.
+   * @param edge    edge to set subPartitionSetProperty
+   */
+  private void setSubPartitionSetPropertyByTestingTrial(final IREdge edge) {
+    final ArrayList<KeyRange> partitionSet = new ArrayList<>();
+    int taskIndex = 0;
+    if (testingTrial == 0) {
+      for (int i = 0; i < 4; i++) {
+        partitionSet.add(taskIndex, HashRange.of(i, i + 1));
+        taskIndex++;
+      }
+      for (int groupStartingIndex = 4; groupStartingIndex < 512; groupStartingIndex *= 2) {
+        int growingFactor = groupStartingIndex / 4;
+        for (int startIndex = groupStartingIndex; startIndex < groupStartingIndex * 2; startIndex += growingFactor) {
+          partitionSet.add(taskIndex, HashRange.of(startIndex, startIndex + growingFactor));
+          taskIndex++;
+        }
+      }
+      edge.setProperty(SubPartitionSetProperty.of(partitionSet));
+    } else {
+      partitionSet.add(0, HashRange.of(512, partitionerProperty)); // 31+testingTrial
+      edge.setProperty(SubPartitionSetProperty.of(partitionSet));
+    }
+  }
+
+  /**
+   * Mark edges for DTS (i.e. incoming edges of second iteration vertices).
+   * @param toAssign          Signal Vertex to get MessageIdVertexProperty
+   * @param edgesToOptimize   Edges to mark for DTS
+   */
+  private void markEdgesToOptimize(final List<OperatorVertex> toAssign, final Set<IREdge> edgesToOptimize) {
+    if (testingTrial > 0) {
+      edgesToOptimize.forEach(edge -> {
+        if (!edge.getDst().getPropertyValue(ParallelismProperty.class).get().equals(1)) {
+          throw new IllegalArgumentException("Target edges should begin with Parallelism of 1.");
+        }
+        final HashSet<Integer> msgEdgeIds =
+          edge.getPropertyValue(MessageIdEdgeProperty.class).orElse(new HashSet<>(0));
+        msgEdgeIds.add(toAssign.get(0).getPropertyValue(MessageIdVertexProperty.class).get());
+        edge.setProperty(MessageIdEdgeProperty.of(msgEdgeIds));
+      });
+    }
+  }
+
+  public void printLogs() {

Review comment:
       override toString()?
   https://www.tutorialspoint.com/java/number_tostring.htm
   
   please refer to other classes in the repository

##########
File path: common/src/main/java/org/apache/nemo/common/ir/vertex/utility/TaskSizeSplitterVertex.java
##########
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.ir.vertex.utility;
+
+import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.common.Util;
+import org.apache.nemo.common.dag.DAG;
+import org.apache.nemo.common.dag.DAGBuilder;
+import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.MessageIdEdgeProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.SubPartitionSetProperty;
+import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.ir.vertex.LoopVertex;
+import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.ir.vertex.executionproperty.MessageIdVertexProperty;
+import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import org.apache.nemo.common.ir.vertex.transform.SignalTransform;
+import org.apache.nemo.common.ir.vertex.utility.runtimepasstriggervertex.SignalVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * This vertex works as a partition-based sampling vertex of dynamic task sizing pass.
+ * It covers both sampling vertices and optimized vertices known from sampling by iterating same vertices, giving
+ * different properties in each iteration.
+ */
+public final class TaskSizeSplitterVertex extends LoopVertex {

Review comment:
       Personally I don't think it is a good idea to extend LoopVertex, which expresses iterations in applications, to implement this vertex which serves a quite different purpose. There is also no integrity checker for LoopVertex (all of the other utility vertices have corresponding checkers), so this introduces one more utility vertex with no integrity checker.
   
   If there is some common logic between the two, it'd be good to refactor that into a separate class, and make this vertex and the LoopVertex depend on that. And then it'd be really nice to add checkers for this vertex in the IRDAG integrity checker, and also add unit tests to test the checker. But I don't want to slow down the progress and will merge if you want to go ahead with the current implementation.
   
   IRDAGChecker:
   https://github.com/apache/incubator-nemo/blob/master/common/src/main/java/org/apache/nemo/common/ir/IRDAGChecker.java
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org