You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/08/14 09:51:44 UTC

[GitHub] johnyangk closed pull request #99: [NEMO-64] Fix map stage hang under DataSkewPolicy

johnyangk closed pull request #99: [NEMO-64] Fix map stage hang under DataSkewPolicy
URL: https://github.com/apache/incubator-nemo/pull/99
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
index f0ae953de..595085428 100644
--- a/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
+++ b/client/src/main/java/edu/snu/nemo/client/JobLauncher.java
@@ -315,6 +315,7 @@ public static Configuration getJobConf(final String[] args) throws IOException,
     cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumListeningThreads.class);
     cl.registerShortNameOfClass(JobConf.PartitionTransportServerNumWorkingThreads.class);
     cl.registerShortNameOfClass(JobConf.PartitionTransportClientNumThreads.class);
+    cl.registerShortNameOfClass(JobConf.MaxNumDownloadsForARuntimeEdge.class);
     cl.processCommandLine(args);
     return confBuilder.build();
   }
diff --git a/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java b/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java
new file mode 100644
index 000000000..2847d8da2
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/DataSkewMetricFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common;
+
+import java.util.Map;
+
+/**
+ * A {@link MetricFactory} which is used for data skew handling.
+ */
+public final class DataSkewMetricFactory implements MetricFactory<Map<Integer, KeyRange>> {
+  private Map<Integer, KeyRange> metric;
+
+  /**
+   * Default constructor.
+   */
+  public DataSkewMetricFactory(final Map<Integer, KeyRange> metric) {
+    this.metric = metric;
+  }
+
+  public Map<Integer, KeyRange> getMetric() {
+    return metric;
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java b/common/src/main/java/edu/snu/nemo/common/HashRange.java
similarity index 90%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java
rename to common/src/main/java/edu/snu/nemo/common/HashRange.java
index 50e43349e..cdc7d49dd 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/HashRange.java
+++ b/common/src/main/java/edu/snu/nemo/common/HashRange.java
@@ -13,7 +13,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.data;
+package edu.snu.nemo.common;
+
+import java.util.Arrays;
 
 /**
  * Descriptor for hash range.
@@ -109,22 +111,23 @@ public boolean equals(final Object o) {
       return false;
     }
     final HashRange hashRange = (HashRange) o;
-    if (rangeBeginInclusive != hashRange.rangeBeginInclusive) {
+    if (rangeBeginInclusive != hashRange.rangeBeginInclusive
+        || rangeEndExclusive != hashRange.rangeEndExclusive
+        || isSkewed != hashRange.isSkewed) {
       return false;
     }
-    return rangeEndExclusive == hashRange.rangeEndExclusive;
+    return true;
   }
 
   @Override
   public int hashCode() {
-    int result = rangeBeginInclusive;
-    result = 31 * result + rangeEndExclusive;
-    return result;
+    return Arrays.hashCode(new Object[] {
+        rangeBeginInclusive,
+        rangeEndExclusive,
+        isSkewed,
+    });
   }
 
-  public void setAsSkewed() {
-    isSkewed = true;
-  }
   public boolean isSkewed() {
     return isSkewed;
   }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java b/common/src/main/java/edu/snu/nemo/common/KeyRange.java
similarity index 97%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java
rename to common/src/main/java/edu/snu/nemo/common/KeyRange.java
index 0c46fc5f8..6fcac362f 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/data/KeyRange.java
+++ b/common/src/main/java/edu/snu/nemo/common/KeyRange.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.data;
+package edu.snu.nemo.common;
 
 import java.io.Serializable;
 
diff --git a/common/src/main/java/edu/snu/nemo/common/MetricFactory.java b/common/src/main/java/edu/snu/nemo/common/MetricFactory.java
new file mode 100644
index 000000000..a98b81b89
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/MetricFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common;
+
+import java.io.Serializable;
+
+/**
+ * A serializable metric factory.
+ *
+ * @param <T> metric type.
+ */
+public interface MetricFactory<T> extends Serializable {
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java
new file mode 100644
index 000000000..a5f37a91e
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/DataSkewMetricProperty.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.edge.executionproperty;
+
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+
+/**
+ * DataSkewMetric ExecutionProperty.
+ */
+public final class DataSkewMetricProperty extends EdgeExecutionProperty<DataSkewMetricFactory> {
+  /**
+   * Constructor.
+   *
+   * @param value value of the execution property.
+   */
+  private DataSkewMetricProperty(final DataSkewMetricFactory value) {
+    super(value);
+  }
+
+  /**
+   * Static method exposing the constructor.
+   *
+   * @param value value of the new execution property.
+   * @return the newly created execution property.
+   */
+  public static DataSkewMetricProperty of(final DataSkewMetricFactory value) {
+    return new DataSkewMetricProperty(value);
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
index 67fc42dc1..09ebfbff5 100644
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/IRVertex.java
@@ -29,6 +29,7 @@
  */
 public abstract class IRVertex extends Vertex {
   private final ExecutionPropertyMap<VertexExecutionProperty> executionProperties;
+  private boolean stagePartitioned;
 
   /**
    * Constructor of IRVertex.
@@ -36,6 +37,7 @@
   public IRVertex() {
     super(IdManager.newVertexId());
     this.executionProperties = ExecutionPropertyMap.of(this);
+    this.stagePartitioned = false;
   }
 
   /**
@@ -89,6 +91,13 @@ public final IRVertex setPropertyPermanently(final VertexExecutionProperty<?> ex
     return executionProperties;
   }
 
+  public final void setStagePartitioned() {
+    stagePartitioned = true;
+  }
+  public final boolean getStagePartitioned() {
+    return stagePartitioned;
+  }
+
   /**
    * @return IRVertex properties in String form.
    */
diff --git a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
index 3e933a2e5..da7adb66d 100644
--- a/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
+++ b/compiler/backend/src/main/java/edu/snu/nemo/compiler/backend/nemo/NemoBackend.java
@@ -61,6 +61,6 @@ public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG) throws Exception
   public PhysicalPlan compile(final DAG<IRVertex, IREdge> irDAG,
                               final PhysicalPlanGenerator physicalPlanGenerator) {
     final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(irDAG);
-    return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), stageDAG);
+    return new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), irDAG, stageDAG);
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
new file mode 100644
index 000000000..c96ac0bc0
--- /dev/null
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/DefaultMetricPass.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
+
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Pass for initiating IREdge Metric ExecutionProperty with default key range.
+ */
+public final class DefaultMetricPass extends AnnotatingPass {
+  /**
+   * Default constructor.
+   */
+  public DefaultMetricPass() {
+    super(DataSkewMetricProperty.class);
+  }
+
+  @Override
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
+    dag.topologicalDo(dst ->
+      dag.getIncomingEdgesOf(dst).forEach(edge -> {
+        if (CommunicationPatternProperty.Value.Shuffle
+            .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
+          final int parallelism = dst.getPropertyValue(ParallelismProperty.class).get();
+          final Map<Integer, KeyRange> metric = new HashMap<>();
+          for (int i = 0; i < parallelism; i++) {
+            metric.put(i, HashRange.of(i, i + 1, false));
+          }
+          edge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric)));
+        }
+      }));
+    return dag;
+  }
+}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index e2ac70fac..c5bc1d9d5 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -53,10 +53,16 @@ private boolean hasMetricCollectionBarrierVertexAsParent(final DAG<IRVertex, IRE
         .filter(v -> v instanceof MetricCollectionBarrierVertex)
         .forEach(v -> v.setProperty(DynamicOptimizationProperty
             .of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
+
     dag.getVertices().stream()
         .filter(v -> hasMetricCollectionBarrierVertexAsParent(dag, v)
-            && !v.getPropertyValue(ResourceSkewedDataProperty.class).isPresent())
-        .forEach(v -> v.setProperty(ResourceSkewedDataProperty.of(true)));
+            && !v.getExecutionProperties().containsKey(ResourceSkewedDataProperty.class))
+        .forEach(childV -> {
+          childV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
+          dag.getDescendants(childV.getId()).forEach(descendentV -> {
+            descendentV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
+          });
+        });
 
     return dag;
   }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java
index 4dc16d42d..6bb119b05 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/DefaultCompositePass.java
@@ -31,6 +31,7 @@
   public DefaultCompositePass() {
     super(Arrays.asList(
         new DefaultParallelismPass(),
+        new DefaultMetricPass(),
         new DefaultEdgeEncoderPass(),
         new DefaultEdgeDecoderPass(),
         new DefaultDataStorePass(),
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
index 9d7eb2972..acb8d50e5 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
@@ -44,14 +44,6 @@ public DataSkewPolicy() {
     this.policy = BUILDER.build();
   }
 
-  public DataSkewPolicy(final int skewness) {
-    this.policy = new PolicyBuilder()
-        .registerRuntimePass(new DataSkewRuntimePass().setNumSkewedKeys(skewness), new SkewCompositePass())
-        .registerCompileTimePass(new LoopOptimizationCompositePass())
-        .registerCompileTimePass(new DefaultCompositePass())
-        .build();
-  }
-
   @Override
   public DAG<IRVertex, IREdge> runCompileTimeOptimization(final DAG<IRVertex, IREdge> dag, final String dagDirectory)
       throws Exception {
diff --git a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
index f75cb8c62..79191ba8c 100644
--- a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -26,19 +26,19 @@
 public final class PolicyBuilderTest {
   @Test
   public void testDisaggregationPolicy() {
-    assertEquals(17, DisaggregationPolicy.BUILDER.getCompileTimePasses().size());
+    assertEquals(18, DisaggregationPolicy.BUILDER.getCompileTimePasses().size());
     assertEquals(0, DisaggregationPolicy.BUILDER.getRuntimePasses().size());
   }
 
   @Test
   public void testTransientResourcePolicy() {
-    assertEquals(19, TransientResourcePolicy.BUILDER.getCompileTimePasses().size());
+    assertEquals(20, TransientResourcePolicy.BUILDER.getCompileTimePasses().size());
     assertEquals(0, TransientResourcePolicy.BUILDER.getRuntimePasses().size());
   }
 
   @Test
   public void testDataSkewPolicy() {
-    assertEquals(21, DataSkewPolicy.BUILDER.getCompileTimePasses().size());
+    assertEquals(22, DataSkewPolicy.BUILDER.getCompileTimePasses().size());
     assertEquals(1, DataSkewPolicy.BUILDER.getRuntimePasses().size());
   }
 
diff --git a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
index 871bf4411..d26adaf17 100644
--- a/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
+++ b/conf/src/main/java/edu/snu/nemo/conf/JobConf.java
@@ -126,7 +126,6 @@
   }
 
   /**
-<<<<<<< HEAD
    * Max number of attempts for task scheduling.
    */
   @NamedParameter(doc = "Max number of task attempts", short_name = "max_task_attempt", default_value = "1")
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
index 2e2e61619..049ccbe26 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/RuntimeIdGenerator.java
@@ -43,9 +43,10 @@ private RuntimeIdGenerator() {
    * Generates the ID for physical plan.
    *
    * @return the generated ID
+   * TODO #100: Refactor string-based RuntimeIdGenerator for IR-based DynOpt
    */
   public static String generatePhysicalPlanId() {
-    return "Plan-" + physicalPlanIdGenerator.getAndIncrement();
+    return "Plan-" + physicalPlanIdGenerator.get();
   }
 
   /**
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
index 50df79915..9a8b26a04 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEvent.java
@@ -19,7 +19,7 @@
 package edu.snu.nemo.runtime.common.eventhandler;
 
 import edu.snu.nemo.common.eventhandler.RuntimeEvent;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
+import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 /**
@@ -27,25 +27,27 @@
  */
 public final class DynamicOptimizationEvent implements RuntimeEvent {
   private final PhysicalPlan physicalPlan;
-  private final MetricCollectionBarrierVertex metricCollectionBarrierVertex;
+  private final Object dynOptData;
   private final String taskId;
   private final String executorId;
+  private final IREdge targetEdge;
 
   /**
    * Default constructor.
    * @param physicalPlan physical plan to be optimized.
-   * @param metricCollectionBarrierVertex metric collection barrier vertex to retrieve metric data from.
    * @param taskId id of the task which triggered the dynamic optimization.
    * @param executorId the id of executor which executes {@code taskId}
    */
   public DynamicOptimizationEvent(final PhysicalPlan physicalPlan,
-                                  final MetricCollectionBarrierVertex metricCollectionBarrierVertex,
+                                  final Object dynOptData,
                                   final String taskId,
-                                  final String executorId) {
+                                  final String executorId,
+                                  final IREdge targetEdge) {
     this.physicalPlan = physicalPlan;
-    this.metricCollectionBarrierVertex = metricCollectionBarrierVertex;
     this.taskId = taskId;
+    this.dynOptData = dynOptData;
     this.executorId = executorId;
+    this.targetEdge = targetEdge;
   }
 
   /**
@@ -56,14 +58,7 @@ public PhysicalPlan getPhysicalPlan() {
   }
 
   /**
-   * @return the metric collection barrier vertex for the dynamic optimization.
-   */
-  public MetricCollectionBarrierVertex getMetricCollectionBarrierVertex() {
-    return this.metricCollectionBarrierVertex;
-  }
-
-  /**
-   * @return id of the task which triggered the dynamic optimization
+   * @return id of the task which triggered the dynamic optimization.
    */
   public String getTaskId() {
     return taskId;
@@ -75,4 +70,12 @@ public String getTaskId() {
   public String getExecutorId() {
     return executorId;
   }
+
+  public Object getDynOptData() {
+    return this.dynOptData;
+  }
+
+  public IREdge getTargetEdge() {
+    return this.targetEdge;
+  }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
index 56ce7c141..a642248cf 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/eventhandler/DynamicOptimizationEventHandler.java
@@ -20,7 +20,7 @@
 
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
+import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.runtime.common.optimizer.RunTimeOptimizer;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.reef.wake.impl.PubSubEventHandler;
@@ -50,11 +50,10 @@ private DynamicOptimizationEventHandler(final PubSubEventHandlerWrapper pubSubEv
   @Override
   public void onNext(final DynamicOptimizationEvent dynamicOptimizationEvent) {
     final PhysicalPlan physicalPlan = dynamicOptimizationEvent.getPhysicalPlan();
-    final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
-            dynamicOptimizationEvent.getMetricCollectionBarrierVertex();
+    final Object dynOptData = dynamicOptimizationEvent.getDynOptData();
+    final IREdge targetEdge = dynamicOptimizationEvent.getTargetEdge();
 
-    final PhysicalPlan newPlan = RunTimeOptimizer.dynamicOptimization(physicalPlan,
-        metricCollectionBarrierVertex);
+    final PhysicalPlan newPlan = RunTimeOptimizer.dynamicOptimization(physicalPlan, dynOptData, targetEdge);
 
     pubSubEventHandler.onNext(new UpdatePhysicalPlanEvent(newPlan, dynamicOptimizationEvent.getTaskId(),
         dynamicOptimizationEvent.getExecutorId()));
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
index e251c983c..30ad95846 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
@@ -16,10 +16,17 @@
 package edu.snu.nemo.runtime.common.optimizer;
 
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlanGenerator;
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.plan.StageEdge;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
 
 import java.util.*;
 
@@ -36,24 +43,27 @@ private RunTimeOptimizer() {
   /**
    * Dynamic optimization method to process the dag with an appropriate pass, decided by the stats.
    * @param originalPlan original physical execution plan.
-   * @param metricCollectionBarrierVertex the vertex that collects metrics and chooses which optimization to perform.
    * @return the newly updated optimized physical plan.
    */
   public static synchronized PhysicalPlan dynamicOptimization(
           final PhysicalPlan originalPlan,
-          final MetricCollectionBarrierVertex metricCollectionBarrierVertex) {
-    final DynamicOptimizationProperty.Value dynamicOptimizationType =
-        metricCollectionBarrierVertex.getPropertyValue(DynamicOptimizationProperty.class).get();
+          final Object dynOptData,
+          final IREdge targetEdge) {
+    try {
+      final PhysicalPlanGenerator physicalPlanGenerator =
+          Tang.Factory.getTang().newInjector().getInstance(PhysicalPlanGenerator.class);
 
-    switch (dynamicOptimizationType) {
-      case DataSkewRuntimePass:
-        // Metric data for DataSkewRuntimePass is a pair of blockIds and map of hashrange, partition size.
-        final Pair<List<String>, Map<Integer, Long>> metricData =
-            Pair.of(metricCollectionBarrierVertex.getBlockIds(),
-                (Map<Integer, Long>) metricCollectionBarrierVertex.getMetricData());
-        return new DataSkewRuntimePass().apply(originalPlan, metricData);
-      default:
-        throw new UnsupportedOperationException("Unknown runtime pass: " + dynamicOptimizationType);
+      // Data for dynamic optimization used in DataSkewRuntimePass
+      // is a map of <hash value, partition size>.
+      final DAG<IRVertex, IREdge> newIrDAG =
+          new DataSkewRuntimePass()
+              .apply(originalPlan.getIrDAG(), Pair.of(targetEdge, (Map<Integer, Long>) dynOptData));
+      final DAG<Stage, StageEdge> stageDAG = physicalPlanGenerator.apply(newIrDAG);
+      final PhysicalPlan physicalPlan =
+          new PhysicalPlan(RuntimeIdGenerator.generatePhysicalPlanId(), newIrDAG, stageDAG);
+      return physicalPlan;
+    } catch (final InjectionException e) {
+      throw new RuntimeException(e);
     }
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 3218317dc..67afbad6a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -16,18 +16,18 @@
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.DataSkewMetricFactory;
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
-import edu.snu.nemo.common.exception.DynamicOptimizationException;
-
-import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
-import edu.snu.nemo.runtime.common.plan.Stage;
-import edu.snu.nemo.runtime.common.plan.StageEdge;
-import edu.snu.nemo.runtime.common.data.HashRange;
+
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
+import edu.snu.nemo.common.KeyRange;
+import edu.snu.nemo.common.HashRange;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,19 +41,24 @@
  * this RuntimePass identifies a number of keys with big partition sizes(skewed key)
  * and evenly redistributes data via overwriting incoming edges of destination tasks.
  */
-public final class DataSkewRuntimePass extends RuntimePass<Pair<List<String>, Map<Integer, Long>>> {
+public final class DataSkewRuntimePass extends RuntimePass<Pair<IREdge, Map<Integer, Long>>> {
   private static final Logger LOG = LoggerFactory.getLogger(DataSkewRuntimePass.class.getName());
   private final Set<Class<? extends RuntimeEventHandler>> eventHandlers;
   // Skewed keys denote for top n keys in terms of partition size.
   public static final int DEFAULT_NUM_SKEWED_KEYS = 3;
-  private int numSkewedKeys = DEFAULT_NUM_SKEWED_KEYS;
+  private int numSkewedKeys;
 
   /**
    * Constructor.
    */
   public DataSkewRuntimePass() {
-    this.eventHandlers = Collections.singleton(
-        DynamicOptimizationEventHandler.class);
+    this.eventHandlers = Collections.singleton(DynamicOptimizationEventHandler.class);
+    this.numSkewedKeys = DEFAULT_NUM_SKEWED_KEYS;
+  }
+
+  public DataSkewRuntimePass(final int numOfSkewedKeys) {
+    this();
+    this.numSkewedKeys = numOfSkewedKeys;
   }
 
   public DataSkewRuntimePass setNumSkewedKeys(final int numOfSkewedKeys) {
@@ -67,44 +72,32 @@ public DataSkewRuntimePass setNumSkewedKeys(final int numOfSkewedKeys) {
   }
 
   @Override
-  public PhysicalPlan apply(final PhysicalPlan originalPlan,
-                            final Pair<List<String>, Map<Integer, Long>> metricData) {
-    // Builder to create new stages.
-    final DAGBuilder<Stage, StageEdge> physicalDAGBuilder =
-        new DAGBuilder<>(originalPlan.getStageDAG());
-    final List<String> blockIds = metricData.left();
-
+  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> irDAG,
+                                     final Pair<IREdge, Map<Integer, Long>> metricData) {
     // get edges to optimize
-    final List<String> optimizationEdgeIds = blockIds.stream().map(blockId ->
-        RuntimeIdGenerator.getRuntimeEdgeIdFromBlockId(blockId)).collect(Collectors.toList());
-    final DAG<Stage, StageEdge> stageDAG = originalPlan.getStageDAG();
-    final List<StageEdge> optimizationEdges = stageDAG.getVertices().stream()
-        .flatMap(stage -> stageDAG.getIncomingEdgesOf(stage).stream())
-        .filter(stageEdge -> optimizationEdgeIds.contains(stageEdge.getId()))
+    final List<IREdge> optimizationEdges = irDAG.getVertices().stream()
+        .flatMap(v -> irDAG.getIncomingEdgesOf(v).stream())
+        .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
+            .equals(e.getPropertyValue(MetricCollectionProperty.class)))
         .collect(Collectors.toList());
 
+    final IREdge targetEdge = metricData.left();
     // Get number of evaluators of the next stage (number of blocks).
-    final Integer numOfDstTasks = optimizationEdges.stream().findFirst().orElseThrow(() ->
-        new RuntimeException("optimization edges are empty")).getDst().getTaskIds().size();
+    final Integer dstParallelism = targetEdge.getDst().getPropertyValue(ParallelismProperty.class).get();
 
     // Calculate keyRanges.
-    final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), numOfDstTasks);
-
+    final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), dstParallelism);
+    final Map<Integer, KeyRange> taskIdxToKeyRange = new HashMap<>();
+    for (int i = 0; i < dstParallelism; i++) {
+      taskIdxToKeyRange.put(i, keyRanges.get(i));
+    }
     // Overwrite the previously assigned key range in the physical DAG with the new range.
-    optimizationEdges.forEach(optimizationEdge -> {
-      // Update the information.
-      final Map<Integer, KeyRange> taskIdxToHashRange = new HashMap<>();
-      for (int taskIdx = 0; taskIdx < numOfDstTasks; taskIdx++) {
-        taskIdxToHashRange.put(taskIdx, keyRanges.get(taskIdx));
-      }
-      optimizationEdge.setTaskIdxToKeyRange(taskIdxToHashRange);
-    });
-
-    return new PhysicalPlan(originalPlan.getId(), physicalDAGBuilder.build());
+    targetEdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(taskIdxToKeyRange)));
+    return irDAG;
   }
 
   public List<Integer> identifySkewedKeys(final Map<Integer, Long> keyValToPartitionSizeMap) {
-    // Identify skewed keyes.
+    // Identify skewed keys.
     List<Map.Entry<Integer, Long>> sortedMetricData = keyValToPartitionSizeMap.entrySet().stream()
         .sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
         .collect(Collectors.toList());
@@ -134,31 +127,31 @@ private boolean containsSkewedKey(final List<Integer> skewedKeys,
    * to a key range of partitions with approximate size of (total size of partitions / the number of tasks).
    *
    * @param keyToPartitionSizeMap a map of key to partition size.
-   * @param numOfDstTasks the number of tasks that receives this data as input.
+   * @param dstParallelism the number of tasks that receive this data as input.
    * @return the list of key ranges calculated.
    */
   @VisibleForTesting
   public List<KeyRange> calculateKeyRanges(final Map<Integer, Long> keyToPartitionSizeMap,
-                                           final Integer numOfDstTasks) {
-    // Get the biggest key.
-    final int maxKey = keyToPartitionSizeMap.keySet().stream()
+                                           final Integer dstParallelism) {
+    // Get the last key.
+    final int lastKey = keyToPartitionSizeMap.keySet().stream()
         .max(Integer::compareTo)
-        .orElseThrow(() -> new DynamicOptimizationException("Cannot find max key among blocks."));
+        .get();
 
     // Identify skewed keys, which is top numSkewedKeys number of keys.
     List<Integer> skewedKeys = identifySkewedKeys(keyToPartitionSizeMap);
 
     // Calculate the ideal size for each destination task.
     final Long totalSize = keyToPartitionSizeMap.values().stream().mapToLong(n -> n).sum(); // get total size
-    final Long idealSizePerTask = totalSize / numOfDstTasks; // and derive the ideal size per task
+    final Long idealSizePerTask = totalSize / dstParallelism; // and derive the ideal size per task
 
-    final List<KeyRange> keyRanges = new ArrayList<>(numOfDstTasks);
+    final List<KeyRange> keyRanges = new ArrayList<>(dstParallelism);
     int startingKey = 0;
     int finishingKey = 1;
     Long currentAccumulatedSize = keyToPartitionSizeMap.getOrDefault(startingKey, 0L);
     Long prevAccumulatedSize = 0L;
-    for (int i = 1; i <= numOfDstTasks; i++) {
-      if (i != numOfDstTasks) {
+    for (int i = 1; i <= dstParallelism; i++) {
+      if (i != dstParallelism) {
         // Ideal accumulated partition size for this task.
         final Long idealAccumulatedSize = idealSizePerTask * i;
         // By adding partition sizes, find the accumulated size nearest to the given ideal size.
@@ -185,15 +178,15 @@ private boolean containsSkewedKey(final List<Integer> skewedKeys,
         prevAccumulatedSize = currentAccumulatedSize;
         startingKey = finishingKey;
       } else { // last one: we put the range of the rest.
-        boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, finishingKey);
+        boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, lastKey + 1);
         keyRanges.add(i - 1,
-            HashRange.of(startingKey, maxKey + 1, isSkewedKey));
+            HashRange.of(startingKey, lastKey + 1, isSkewedKey));
 
-        while (finishingKey <= maxKey) {
+        while (finishingKey <= lastKey) {
           currentAccumulatedSize += keyToPartitionSizeMap.getOrDefault(finishingKey, 0L);
           finishingKey++;
         }
-        LOG.debug("KeyRange {}~{}, Size {}", startingKey, maxKey + 1,
+        LOG.debug("KeyRange {}~{}, Size {}", startingKey, lastKey + 1,
             currentAccumulatedSize - prevAccumulatedSize);
       }
     }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
index 249a239b3..5a8471fba 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/RuntimePass.java
@@ -15,9 +15,11 @@
  */
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
+import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.RuntimeEventHandler;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.pass.Pass;
-import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 
 import java.util.Set;
 import java.util.function.BiFunction;
@@ -28,7 +30,8 @@
  * after dynamic optimization.
  * @param <T> type of the metric data used for dynamic optimization.
  */
-public abstract class RuntimePass<T> extends Pass implements BiFunction<PhysicalPlan, T, PhysicalPlan> {
+public abstract class RuntimePass<T> extends Pass
+    implements BiFunction<DAG<IRVertex, IREdge>, T, DAG<IRVertex, IREdge>> {
   /**
    * @return the set of event handlers used with the runtime pass.
    */
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
index 30f71110d..4286bca0c 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlan.java
@@ -16,6 +16,7 @@
 package edu.snu.nemo.runtime.common.plan;
 
 import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.io.Serializable;
@@ -27,6 +28,7 @@
  */
 public final class PhysicalPlan implements Serializable {
   private final String id;
+  private final DAG<IRVertex, IREdge> irDAG;
   private final DAG<Stage, StageEdge> stageDAG;
   private final Map<String, IRVertex> idToIRVertex;
 
@@ -37,8 +39,10 @@
    * @param stageDAG        the DAG of stages.
    */
   public PhysicalPlan(final String id,
+                      final DAG<IRVertex, IREdge> irDAG,
                       final DAG<Stage, StageEdge> stageDAG) {
     this.id = id;
+    this.irDAG = irDAG;
     this.stageDAG = stageDAG;
 
     idToIRVertex = new HashMap<>();
@@ -70,6 +74,13 @@ public String getId() {
     return idToIRVertex;
   }
 
+  /**
+   * @return IR DAG.
+   */
+  public DAG<IRVertex, IREdge> getIrDAG() {
+    return irDAG;
+  }
+
   @Override
   public String toString() {
     return stageDAG.toString();
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index ca4124bad..16f4bc4ec 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -34,6 +34,8 @@
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.reef.tang.annotations.Parameter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
 import java.util.*;
@@ -45,6 +47,7 @@
 public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdge>, DAG<Stage, StageEdge>> {
   private final String dagDirectory;
   private final StagePartitioner stagePartitioner;
+  private static final Logger LOG = LoggerFactory.getLogger(PhysicalPlanGenerator.class.getName());
 
   /**
    * Private constructor.
@@ -147,7 +150,7 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
 
       final DAGBuilder<IRVertex, RuntimeEdge<IRVertex>> stageInternalDAGBuilder = new DAGBuilder<>();
 
-      // Prepare vertexIdtoReadables
+      // Prepare vertexIdToReadables
       final List<Map<String, Readable>> vertexIdToReadables = new ArrayList<>(stageParallelism);
       for (int i = 0; i < stageParallelism; i++) {
         vertexIdToReadables.add(new HashMap<>());
@@ -156,7 +159,7 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
       // For each IRVertex,
       for (final IRVertex irVertex : stageVertices) {
         // Take care of the readables of a source vertex.
-        if (irVertex instanceof SourceVertex) {
+        if (irVertex instanceof SourceVertex && !irVertex.getStagePartitioned()) {
           final SourceVertex sourceVertex = (SourceVertex) irVertex;
           try {
             final List<Readable> readables = sourceVertex.getReadables(stageParallelism);
@@ -200,6 +203,12 @@ private void handleDuplicateEdgeGroupProperty(final DAG<Stage, StageEdge> dagOfS
         dagOfStagesBuilder.addVertex(stage);
         stageIdToStageMap.put(stageId, stage);
       }
+
+      // To prevent re-fetching readables in source vertex
+      // during re-generation of physical plan for dynamic optimization.
+      for (IRVertex irVertex : stageVertices) {
+        irVertex.setStagePartitioned();
+      }
     }
 
     // Add StageEdges
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
index 98773eacb..5218b8fea 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StageEdge.java
@@ -21,11 +21,6 @@
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
-import edu.snu.nemo.runtime.common.data.KeyRange;
-import edu.snu.nemo.runtime.common.data.HashRange;
-
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Edge of a stage that connects an IRVertex of the source stage to an IRVertex of the destination stage.
@@ -44,11 +39,6 @@
    */
   private final IRVertex dstVertex;
 
-  /**
-   * The list between the task idx and key range to read.
-   */
-  private Map<Integer, KeyRange> taskIdxToKeyRange;
-
   /**
    * Value for {@link CommunicationPatternProperty}.
    */
@@ -81,11 +71,6 @@ public StageEdge(final String runtimeEdgeId,
     super(runtimeEdgeId, edgeProperties, srcStage, dstStage, isSideInput);
     this.srcVertex = srcVertex;
     this.dstVertex = dstVertex;
-    // Initialize the key range of each dst task.
-    this.taskIdxToKeyRange = new HashMap<>();
-    for (int taskIdx = 0; taskIdx < dstStage.getTaskIds().size(); taskIdx++) {
-      taskIdxToKeyRange.put(taskIdx, HashRange.of(taskIdx, taskIdx + 1, false));
-    }
     this.dataCommunicationPatternValue = edgeProperties.get(CommunicationPatternProperty.class)
         .orElseThrow(() -> new RuntimeException(String.format(
             "CommunicationPatternProperty not set for %s", runtimeEdgeId)));
@@ -124,22 +109,6 @@ public String toString() {
     return propertiesToJSON();
   }
 
-  /**
-   * @return the list between the task idx and key range to read.
-   */
-  public Map<Integer, KeyRange> getTaskIdxToKeyRange() {
-    return taskIdxToKeyRange;
-  }
-
-  /**
-   * Sets the task idx to key range list.
-   *
-   * @param taskIdxToKeyRange the list to set.
-   */
-  public void setTaskIdxToKeyRange(final Map<Integer, KeyRange> taskIdxToKeyRange) {
-    this.taskIdxToKeyRange = taskIdxToKeyRange;
-  }
-
   /**
    * @return {@link CommunicationPatternProperty} value.
    */
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
index abe0a8f01..88ef43264 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
@@ -83,7 +83,7 @@ public void addIgnoredPropertyKey(final Class<? extends VertexExecutionProperty>
           continue;
         }
         // Assign stageId
-        if (testMergability(edge, irDAG)) {
+        if (testMergeability(edge, irDAG)) {
           vertexToStageIdMap.put(connectedIRVertex, stageId);
         } else {
           vertexToStageIdMap.put(connectedIRVertex, nextStageIndex.getValue());
@@ -99,7 +99,7 @@ public void addIgnoredPropertyKey(final Class<? extends VertexExecutionProperty>
    * @param dag IR DAG which contains {@code edge}
    * @return {@code true} if and only if the source and the destination vertex of the edge can be merged into one stage.
    */
-  private boolean testMergability(final IREdge edge, final DAG<IRVertex, IREdge> dag) {
+  private boolean testMergeability(final IREdge edge, final DAG<IRVertex, IREdge> dag) {
     // If the destination vertex has multiple inEdges, return false
     if (dag.getIncomingEdgesOf(edge.getDst()).size() > 1) {
       return false;
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
index 0cfe8633a..3e95830da 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/Task.java
@@ -20,9 +20,7 @@
 import edu.snu.nemo.common.ir.executionproperty.VertexExecutionProperty;
 
 import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 
 /**
  * A Task is a self-contained executable that can be executed on a machine.
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index 59a321285..911990ec1 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -117,8 +117,6 @@ message BlockStateChangedMsg {
 message DataSizeMetricMsg {
     // TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
     repeated PartitionSizeEntry partitionSize = 1;
-    required string blockId = 2;
-    required string srcIRVertexId = 3;
 }
 
 message PartitionSizeEntry {
diff --git a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
index 319b8babe..04fca2bb1 100644
--- a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
+++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
@@ -15,8 +15,8 @@
  */
 package edu.snu.nemo.runtime.common.optimizer.pass.runtime;
 
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
 import org.junit.Before;
 import org.junit.Test;
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index ba0a71cf4..4cf0b12e5 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -26,7 +26,7 @@
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.comm.ControlMessage.ByteTransferContextDescriptor;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
@@ -296,8 +296,6 @@ public void writeBlock(final Block block,
               .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
               .setType(ControlMessage.MessageType.DataSizeMetric)
               .setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
-                  .setBlockId(blockId)
-                  .setSrcIRVertexId(srcIRVertexId)
                   .addAllPartitionSize(partitionSizeEntries)
               )
               .build());
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
index e0de210ae..960f2c556 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/Block.java
@@ -17,7 +17,7 @@
 
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index 6eef82432..be42f6bae 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -18,7 +18,7 @@
 import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.executor.data.*;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import edu.snu.nemo.runtime.executor.data.partition.Partition;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
index 5bf1e0195..7722f2d8f 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/NonSerializedMemoryBlock.java
@@ -17,7 +17,7 @@
 
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
index 847558fc8..03a7ab1a9 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/SerializedMemoryBlock.java
@@ -17,7 +17,7 @@
 
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.BlockWriteException;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
 import edu.snu.nemo.runtime.executor.data.partition.NonSerializedPartition;
 import edu.snu.nemo.runtime.executor.data.partition.SerializedPartition;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
index 4471ae4d3..987025996 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/InputReader.java
@@ -16,6 +16,8 @@
 package edu.snu.nemo.runtime.executor.datatransfer;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProperty;
@@ -23,16 +25,14 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.common.exception.BlockFetchException;
 import edu.snu.nemo.common.exception.UnsupportedCommPatternException;
-import edu.snu.nemo.runtime.common.data.HashRange;
+import edu.snu.nemo.common.HashRange;
 import edu.snu.nemo.runtime.executor.data.BlockManagerWorker;
 import edu.snu.nemo.runtime.executor.data.DataUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
@@ -45,7 +45,6 @@
  * Represents the input data transfer to a task.
  */
 public final class InputReader extends DataTransfer {
-  private static final Logger LOG = LoggerFactory.getLogger(InputReader.class.getName());
   private final int dstTaskIndex;
   private final BlockManagerWorker blockManagerWorker;
 
@@ -118,8 +117,9 @@ public InputReader(final int dstTaskIndex,
     assert (runtimeEdge instanceof StageEdge);
     final Optional<DataStoreProperty.Value> dataStoreProperty
         = runtimeEdge.getPropertyValue(DataStoreProperty.class);
-    final KeyRange hashRangeToRead =
-        ((StageEdge) runtimeEdge).getTaskIdxToKeyRange().get(dstTaskIndex);
+    final DataSkewMetricFactory metricFactory =
+        (DataSkewMetricFactory) runtimeEdge.getExecutionProperties().get(DataSkewMetricProperty.class).get();
+    final KeyRange hashRangeToRead = metricFactory.getMetric().get(dstTaskIndex);
     if (hashRangeToRead == null) {
       throw new BlockFetchException(
           new Throwable("The hash range to read is not assigned to " + dstTaskIndex + "'th task"));
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
index 8f1b7e552..61adee2c4 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockStoreTest.java
@@ -20,8 +20,8 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.CompressionProperty;
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageDispatcher;
 import edu.snu.nemo.runtime.common.message.local.LocalMessageEnvironment;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
index c05745d28..46c6ce937 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/data/BlockTest.java
@@ -15,9 +15,9 @@
  */
 package edu.snu.nemo.runtime.executor.data;
 
+import edu.snu.nemo.common.HashRange;
 import edu.snu.nemo.common.coder.IntDecoderFactory;
 import edu.snu.nemo.common.coder.IntEncoderFactory;
-import edu.snu.nemo.runtime.common.data.HashRange;
 import edu.snu.nemo.runtime.executor.data.block.Block;
 import edu.snu.nemo.runtime.executor.data.block.FileBlock;
 import edu.snu.nemo.runtime.executor.data.block.NonSerializedMemoryBlock;
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index c755543af..229f30de7 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -15,6 +15,9 @@
  */
 package edu.snu.nemo.runtime.executor.datatransfer;
 
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.common.coder.*;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
@@ -300,6 +303,15 @@ private void writeAndRead(final BlockManagerWorker sender,
     dummyIREdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
     dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY));
     dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY));
+    if (dummyIREdge.getPropertyValue(CommunicationPatternProperty.class).get()
+        .equals(CommunicationPatternProperty.Value.Shuffle)) {
+      final int parallelism = dstVertex.getPropertyValue(ParallelismProperty.class).get();
+      final Map<Integer, KeyRange> metric = new HashMap<>();
+      for (int i = 0; i < parallelism; i++) {
+        metric.put(i, HashRange.of(i, i + 1, false));
+      }
+      dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric)));
+    }
     final ExecutionPropertyMap edgeProperties = dummyIREdge.getExecutionProperties();
     final RuntimeEdge dummyEdge;
 
@@ -383,6 +395,15 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
         = dummyIREdge.getPropertyValue(DuplicateEdgeGroupProperty.class);
     duplicateDataProperty.get().setRepresentativeEdgeId(edgeId);
     duplicateDataProperty.get().setGroupSize(2);
+    if (dummyIREdge.getPropertyValue(CommunicationPatternProperty.class).get()
+        .equals(CommunicationPatternProperty.Value.Shuffle)) {
+      final int parallelism = dstVertex.getPropertyValue(ParallelismProperty.class).get();
+      final Map<Integer, KeyRange> metric = new HashMap<>();
+      for (int i = 0; i < parallelism; i++) {
+        metric.put(i, HashRange.of(i, i + 1, false));
+      }
+      dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(metric)));
+    }
     dummyIREdge.setProperty(DataStoreProperty.of(store));
     dummyIREdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
     final RuntimeEdge dummyEdge, dummyEdge2;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java
new file mode 100644
index 000000000..47671ed54
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import edu.snu.nemo.runtime.common.comm.ControlMessage;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Handler for aggregating data used in data skew dynamic optimization.
+ */
+public class DataSkewDynOptDataHandler implements DynOptDataHandler {
+  private final Map<Integer, Long> aggregatedDynOptData;
+
+  public DataSkewDynOptDataHandler() {
+    this.aggregatedDynOptData = new HashMap<>();
+  }
+
+  /**
+   * Updates data for dynamic optimization sent from Tasks.
+   * @param dynOptData data used for data skew dynamic optimization.
+   */
+  @Override
+  public final void updateDynOptData(final Object dynOptData) {
+    List<ControlMessage.PartitionSizeEntry> partitionSizeInfo
+        = (List<ControlMessage.PartitionSizeEntry>) dynOptData;
+    partitionSizeInfo.forEach(partitionSizeEntry -> {
+      final int hashIndex = partitionSizeEntry.getKey();
+      final long partitionSize = partitionSizeEntry.getSize();
+      if (aggregatedDynOptData.containsKey(hashIndex)) {
+        aggregatedDynOptData.compute(hashIndex, (originalKey, originalValue) -> originalValue + partitionSize);
+      } else {
+        aggregatedDynOptData.put(hashIndex, partitionSize);
+      }
+    });
+  }
+
+  /**
+   * Returns aggregated data for dynamic optimization.
+   * @return aggregated data used for data skew dynamic optimization.
+   */
+  @Override
+  public final Object getDynOptData() {
+    return aggregatedDynOptData;
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java
new file mode 100644
index 000000000..66fa31010
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DynOptDataHandler.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+/**
+ * Handler for aggregating data used in dynamic optimization.
+ */
+public interface DynOptDataHandler {
+  /**
+   * Updates data for dynamic optimization sent from Tasks.
+   * @param dynOptData data used for dynamic optimization.
+   */
+  void updateDynOptData(Object dynOptData);
+
+  /**
+   * Returns aggregated data for dynamic optimization.
+   * @return aggregated data used for dynamic optimization.
+   */
+  Object getDynOptData();
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 41ad24310..d1fa2df42 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -19,13 +19,13 @@
 import edu.snu.nemo.conf.JobConf;
 import edu.snu.nemo.common.exception.*;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageContext;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.master.scheduler.BatchScheduler;
 import edu.snu.nemo.runtime.master.servlet.*;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -82,8 +82,6 @@
   private final MetricMessageHandler metricMessageHandler;
   private final MessageEnvironment masterMessageEnvironment;
   private final MetricStore metricStore;
-  private final Map<Integer, Long> aggregatedMetricData;
-  private final ExecutorService metricAggregationService;
   private final ClientRPC clientRPC;
   private final MetricManagerMaster metricManagerMaster;
   // For converting json data. This is a thread safe.
@@ -123,8 +121,6 @@ private RuntimeMaster(final Scheduler scheduler,
     this.irVertices = new HashSet<>();
     this.resourceRequestCount = new AtomicInteger(0);
     this.objectMapper = new ObjectMapper();
-    this.aggregatedMetricData = new ConcurrentHashMap<>();
-    this.metricAggregationService = Executors.newFixedThreadPool(10);
     this.metricStore = MetricStore.getStore();
     this.metricServer = startRestMetricServer();
   }
@@ -343,10 +339,8 @@ private void handleControlMessage(final ControlMessage.Message message) {
         LOG.error(failedExecutorId + " failed, Stack Trace: ", exception);
         throw new RuntimeException(exception);
       case DataSizeMetric:
-        final ControlMessage.DataSizeMetricMsg dataSizeMetricMsg = message.getDataSizeMetricMsg();
         // TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
-        accumulateBarrierMetric(dataSizeMetricMsg.getPartitionSizeList(),
-            dataSizeMetricMsg.getSrcIRVertexId(), dataSizeMetricMsg.getBlockId());
+        ((BatchScheduler) scheduler).updateDynOptData(message.getDataSizeMetricMsg().getPartitionSizeList());
         break;
       case MetricMessageReceived:
         final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
@@ -371,45 +365,6 @@ private void handleControlMessage(final ControlMessage.Message message) {
     }
   }
 
-  /**
-   * Accumulates the metric data for a barrier vertex.
-   * TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
-   * TODO #98: Implement MetricVertex that collect metric used for dynamic optimization.
-   *
-   * @param partitionSizeInfo the size of partitions in a block to accumulate.
-   * @param srcVertexId       the ID of the source vertex.
-   * @param blockId           the ID of the block.
-   */
-  private void accumulateBarrierMetric(final List<ControlMessage.PartitionSizeEntry> partitionSizeInfo,
-                                       final String srcVertexId,
-                                       final String blockId) {
-    final IRVertex vertexToSendMetricDataTo = irVertices.stream()
-        .filter(irVertex -> irVertex.getId().equals(srcVertexId)).findFirst()
-        .orElseThrow(() -> new RuntimeException(srcVertexId + " doesn't exist in the submitted Physical Plan"));
-
-    if (vertexToSendMetricDataTo instanceof MetricCollectionBarrierVertex) {
-      final MetricCollectionBarrierVertex<Integer, Long> metricCollectionBarrierVertex =
-          (MetricCollectionBarrierVertex) vertexToSendMetricDataTo;
-
-      metricCollectionBarrierVertex.addBlockId(blockId);
-      metricAggregationService.submit(() -> {
-        // For each hash range index, we aggregate the metric data.
-        partitionSizeInfo.forEach(partitionSizeEntry -> {
-          final int key = partitionSizeEntry.getKey();
-          final long size = partitionSizeEntry.getSize();
-          if (aggregatedMetricData.containsKey(key)) {
-            aggregatedMetricData.compute(key, (existKey, existValue) -> existValue + size);
-          } else {
-            aggregatedMetricData.put(key, size);
-          }
-        });
-        metricCollectionBarrierVertex.setMetricData(aggregatedMetricData);
-      });
-    } else {
-      throw new RuntimeException("Something wrong happened at SkewCompositePass.");
-    }
-  }
-
   private static TaskState.State convertTaskState(final ControlMessage.TaskStateFromExecutor state) {
     switch (state) {
       case READY:
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
index d0e21d8b9..d5c2e89b9 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/resource/ExecutorRepresenter.java
@@ -44,7 +44,6 @@
  */
 @NotThreadSafe
 public final class ExecutorRepresenter {
-
   private final String executorId;
   private final ResourceSpecification resourceSpecification;
   private final Map<String, Task> runningComplyingTasks;
@@ -107,7 +106,6 @@ public void onTaskScheduled(final Task task) {
         ? runningComplyingTasks : runningNonComplyingTasks).put(task.getTaskId(), task);
     runningTaskToAttempt.put(task, task.getAttemptIdx());
     failedTasks.remove(task);
-
     serializationExecutorService.submit(() -> {
       final byte[] serialized = SerializationUtils.serialize(task);
       sendControlMessage(
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 24c0e435f..77881c05b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -17,18 +17,20 @@
 
 import com.google.common.collect.Sets;
 import edu.snu.nemo.common.Pair;
-import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.Readable;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.eventhandler.DynamicOptimizationEvent;
 import edu.snu.nemo.runtime.common.plan.*;
 import edu.snu.nemo.runtime.common.state.BlockState;
 import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.master.DataSkewDynOptDataHandler;
+import edu.snu.nemo.runtime.master.DynOptDataHandler;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
 import edu.snu.nemo.common.exception.*;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.runtime.common.state.StageState;
 import edu.snu.nemo.runtime.master.BlockManagerMaster;
 import edu.snu.nemo.runtime.master.PlanStateManager;
@@ -75,6 +77,7 @@
   private PhysicalPlan physicalPlan;
   private PlanStateManager planStateManager;
   private List<List<Stage>> sortedScheduleGroups;
+  private List<DynOptDataHandler> dynOptDataHandlers;
 
   @Inject
   private BatchScheduler(final TaskDispatcher taskDispatcher,
@@ -93,6 +96,8 @@ private BatchScheduler(final TaskDispatcher taskDispatcher,
           .subscribe(updatePhysicalPlanEventHandler.getEventClass(), updatePhysicalPlanEventHandler);
     }
     this.executorRegistry = executorRegistry;
+    this.dynOptDataHandlers = new ArrayList<>();
+    dynOptDataHandlers.add(new DataSkewDynOptDataHandler());
   }
 
   /**
@@ -130,14 +135,12 @@ public void updatePlan(final String planId, final PhysicalPlan newPhysicalPlan)
    * Handles task state transition notifications sent from executors.
    * Note that we can receive notifications for previous task attempts, due to the nature of asynchronous events.
    * We ignore such late-arriving notifications, and only handle notifications for the current task attempt.
-   *
    * @param executorId the id of the executor where the message was sent from.
    * @param taskId whose state has changed
    * @param taskAttemptIndex of the task whose state has changed
    * @param newState the state to change to
    * @param vertexPutOnHold the ID of vertex that is put on hold. It is null otherwise.
    */
-  @Override
   public void onTaskStateReportFromExecutor(final String executorId,
                                             final String taskId,
                                             final int taskAttemptIndex,
@@ -158,7 +161,7 @@ public void onTaskStateReportFromExecutor(final String executorId,
           onTaskExecutionFailedRecoverable(executorId, taskId, failureCause);
           break;
         case ON_HOLD:
-          onTaskExecutionOnHold(executorId, taskId, vertexPutOnHold);
+          onTaskExecutionOnHold(executorId, taskId);
           break;
         case FAILED:
           throw new UnrecoverableFailureException(new Exception(new StringBuffer().append("The plan failed on Task #")
@@ -364,15 +367,34 @@ private void onTaskExecutionComplete(final String executorId,
     });
   }
 
+  public IREdge getEdgeToOptimize(final String taskId) {
+    // Get a stage including the given task
+    final Stage stagePutOnHold = physicalPlan.getStageDAG().getVertices().stream()
+        .filter(stage -> stage.getTaskIds().contains(taskId)).findFirst().get();
+
+    // Get outgoing edges of that stage with MetricCollectionProperty
+    List<StageEdge> stageEdges = physicalPlan.getStageDAG().getOutgoingEdgesOf(stagePutOnHold);
+    IREdge targetEdge = null;
+    for (StageEdge edge : stageEdges) {
+      final IRVertex srcIRVertex = edge.getSrcIRVertex();
+      final IRVertex dstIRVertex = edge.getDstIRVertex();
+      targetEdge = physicalPlan.getIrDAG().getEdgeBetween(srcIRVertex.getId(), dstIRVertex.getId());
+      if (MetricCollectionProperty.Value.DataSkewRuntimePass
+          .equals(targetEdge.getPropertyValue(MetricCollectionProperty.class).get())) {
+        break;
+      }
+    }
+
+    return targetEdge;
+  }
+
   /**
    * Action for after task execution is put on hold.
    * @param executorId       the ID of the executor.
    * @param taskId           the ID of the task.
-   * @param vertexPutOnHold  the ID of vertex that is put on hold.
    */
   private void onTaskExecutionOnHold(final String executorId,
-                                     final String taskId,
-                                     final String vertexPutOnHold) {
+                                     final String taskId) {
     LOG.info("{} put on hold in {}", new Object[]{taskId, executorId});
     executorRegistry.updateExecutor(executorId, (executor, state) -> {
       executor.onTaskExecutionComplete(taskId);
@@ -383,21 +405,18 @@ private void onTaskExecutionOnHold(final String executorId,
     final boolean stageComplete =
         planStateManager.getStageState(stageIdForTaskUponCompletion).equals(StageState.State.COMPLETE);
 
+    final IREdge targetEdge = getEdgeToOptimize(taskId);
+    if (targetEdge == null) {
+      throw new RuntimeException("No edges specified for data skew optimization");
+    }
+
     if (stageComplete) {
-      // get optimization vertex from the task.
-      final MetricCollectionBarrierVertex metricCollectionBarrierVertex =
-          getVertexDagById(taskId).getVertices().stream() // get vertex list
-              .filter(irVertex -> irVertex.getId().equals(vertexPutOnHold)) // find it
-              .filter(irVertex -> irVertex instanceof MetricCollectionBarrierVertex)
-              .distinct()
-              .map(irVertex -> (MetricCollectionBarrierVertex) irVertex) // convert types
-              .findFirst().orElseThrow(() -> new RuntimeException(TaskState.State.ON_HOLD.name() // get it
-              + " called with failed task ids by some other task than "
-              + MetricCollectionBarrierVertex.class.getSimpleName()));
-      // and we will use this vertex to perform metric collection and dynamic optimization.
-
-      pubSubEventHandlerWrapper.getPubSubEventHandler().onNext(
-          new DynamicOptimizationEvent(physicalPlan, metricCollectionBarrierVertex, taskId, executorId));
+      final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
+          .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
+          .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+      pubSubEventHandlerWrapper.getPubSubEventHandler()
+          .onNext(new DynamicOptimizationEvent(physicalPlan, dynOptDataHandler.getDynOptData(),
+              taskId, executorId, targetEdge));
     }
   }
 
@@ -478,16 +497,10 @@ private void retryTasksAndRequiredParents(final Set<String> tasks) {
         .collect(Collectors.toSet());
   }
 
-  /**
-   * @param taskId id of the task
-   * @return the IR dag
-   */
-  private DAG<IRVertex, RuntimeEdge<IRVertex>> getVertexDagById(final String taskId) {
-    for (final Stage stage : physicalPlan.getStageDAG().getVertices()) {
-      if (stage.getId().equals(RuntimeIdGenerator.getStageIdFromTaskId(taskId))) {
-        return stage.getIRDAG();
-      }
-    }
-    throw new RuntimeException("This taskId does not exist in the plan");
+  public void updateDynOptData(final Object dynOptData) {
+    final DynOptDataHandler dynOptDataHandler = dynOptDataHandlers.stream()
+        .filter(dataHandler -> dataHandler instanceof DataSkewDynOptDataHandler)
+        .findFirst().orElseThrow(() -> new RuntimeException("DataSkewDynOptDataHandler is not registered!"));
+    dynOptDataHandler.updateDynOptData(dynOptData);
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
index ff3986e50..a2c0935b9 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/NodeShareSchedulingConstraint.java
@@ -51,7 +51,7 @@ private String getNodeName(final Map<String, Integer> propertyValue, final int t
   @Override
   public boolean testSchedulability(final ExecutorRepresenter executor, final Task task) {
     final Map<String, Integer> propertyValue = task.getPropertyValue(ResourceSiteProperty.class)
-            .orElseThrow(() -> new RuntimeException("ResourceSiteProperty expected"));
+        .orElseThrow(() -> new RuntimeException("ResourceSiteProperty expected"));
     if (propertyValue.isEmpty()) {
       return true;
     }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
index 4a774bf7f..50dd8d5d8 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SchedulingConstraintRegistry.java
@@ -45,8 +45,8 @@ private SchedulingConstraintRegistry(
     registerSchedulingConstraint(containerTypeAwareSchedulingConstraint);
     registerSchedulingConstraint(freeSlotSchedulingConstraint);
     registerSchedulingConstraint(sourceLocationAwareSchedulingConstraint);
-    registerSchedulingConstraint(nodeShareSchedulingConstraint);
     registerSchedulingConstraint(skewnessAwareSchedulingConstraint);
+    registerSchedulingConstraint(nodeShareSchedulingConstraint);
   }
 
   /**
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
index 236453fc0..6e5ba4c2d 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraint.java
@@ -16,11 +16,13 @@
 package edu.snu.nemo.runtime.master.scheduler;
 
 import com.google.common.annotations.VisibleForTesting;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
 import edu.snu.nemo.common.ir.executionproperty.AssociatedProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSkewedDataProperty;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -28,6 +30,7 @@
 
 import javax.annotation.concurrent.ThreadSafe;
 import javax.inject.Inject;
+import java.util.Map;
 
 /**
  * This policy aims to distribute partitions with skewed keys to different executors.
@@ -36,7 +39,6 @@
 @DriverSide
 @AssociatedProperty(ResourceSkewedDataProperty.class)
 public final class SkewnessAwareSchedulingConstraint implements SchedulingConstraint {
-
   @VisibleForTesting
   @Inject
   public SkewnessAwareSchedulingConstraint() {
@@ -45,9 +47,14 @@ public SkewnessAwareSchedulingConstraint() {
   public boolean hasSkewedData(final Task task) {
     final int taskIdx = RuntimeIdGenerator.getIndexFromTaskId(task.getTaskId());
     for (StageEdge inEdge : task.getTaskIncomingEdges()) {
-      final KeyRange hashRange = inEdge.getTaskIdxToKeyRange().get(taskIdx);
-      if (((HashRange) hashRange).isSkewed()) {
-        return true;
+      if (CommunicationPatternProperty.Value.Shuffle
+      .equals(inEdge.getDataCommunicationPattern())) {
+        final Map<Integer, KeyRange> taskIdxToKeyRange =
+            inEdge.getPropertyValue(DataSkewMetricProperty.class).get().getMetric();
+        final KeyRange hashRange = taskIdxToKeyRange.get(taskIdx);
+        if (((HashRange) hashRange).isSkewed()) {
+          return true;
+        }
       }
     }
     return false;
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
index 6c0222c41..e6feaa568 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/TaskDispatcher.java
@@ -131,7 +131,6 @@ private void doScheduleTaskList() {
           planStateManager.onTaskStateChanged(task.getTaskId(), TaskState.State.EXECUTING);
 
           LOG.info("{} scheduled to {}", task.getTaskId(), selectedExecutor.getExecutorId());
-
           // send the task
           selectedExecutor.onTaskScheduled(task);
         } else {
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
index 87c933f4b..e3439d6ab 100644
--- a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/scheduler/SkewnessAwareSchedulingConstraintTest.java
@@ -15,9 +15,16 @@
  */
 package edu.snu.nemo.runtime.master.scheduler;
 
+import edu.snu.nemo.common.DataSkewMetricFactory;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DataSkewMetricProperty;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
-import edu.snu.nemo.runtime.common.data.HashRange;
-import edu.snu.nemo.runtime.common.data.KeyRange;
+import edu.snu.nemo.common.HashRange;
+import edu.snu.nemo.common.KeyRange;
+import edu.snu.nemo.runtime.common.plan.Stage;
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.Task;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
@@ -36,27 +43,36 @@
  * Test cases for {@link SkewnessAwareSchedulingConstraint}.
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({ExecutorRepresenter.class, Task.class, HashRange.class, StageEdge.class})
+@PrepareForTest({ExecutorRepresenter.class, Task.class, Stage.class, HashRange.class,
+IRVertex.class, IREdge.class})
 public final class SkewnessAwareSchedulingConstraintTest {
 
-  private static StageEdge mockStageEdge() {
+  private static StageEdge mockStageEdge(final int numSkewedHashRange,
+                                         final int numTotalHashRange) {
     final Map<Integer, KeyRange> taskIdxToKeyRange = new HashMap<>();
 
-    final HashRange skewedHashRange1 = mock(HashRange.class);
-    when(skewedHashRange1.isSkewed()).thenReturn(true);
-    final HashRange skewedHashRange2 = mock(HashRange.class);
-    when(skewedHashRange2.isSkewed()).thenReturn(true);
-    final HashRange hashRange = mock(HashRange.class);
-    when(hashRange.isSkewed()).thenReturn(false);
+    for (int taskIdx = 0; taskIdx < numTotalHashRange; taskIdx++) {
+      final HashRange hashRange = mock(HashRange.class);
+      if (taskIdx < numSkewedHashRange) {
+        when(hashRange.isSkewed()).thenReturn(true);
+      } else {
+        when(hashRange.isSkewed()).thenReturn(false);
+      }
+      taskIdxToKeyRange.put(taskIdx, hashRange);
+    }
 
-    taskIdxToKeyRange.put(0, skewedHashRange1);
-    taskIdxToKeyRange.put(1, skewedHashRange2);
-    taskIdxToKeyRange.put(2, hashRange);
+    final IRVertex srcMockVertex = mock(IRVertex.class);
+    final IRVertex dstMockVertex = mock(IRVertex.class);
+    final Stage srcMockStage = mock(Stage.class);
+    final Stage dstMockStage = mock(Stage.class);
 
-    final StageEdge inEdge = mock(StageEdge.class);
-    when(inEdge.getTaskIdxToKeyRange()).thenReturn(taskIdxToKeyRange);
+    final IREdge dummyIREdge = new IREdge(CommunicationPatternProperty.Value.Shuffle, srcMockVertex, dstMockVertex);
+    dummyIREdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+    dummyIREdge.setProperty(DataSkewMetricProperty.of(new DataSkewMetricFactory(taskIdxToKeyRange)));
+    final StageEdge dummyEdge = new StageEdge("Edge-0", dummyIREdge.getExecutionProperties(),
+        srcMockVertex, dstMockVertex, srcMockStage, dstMockStage, false);
 
-    return inEdge;
+    return dummyEdge;
   }
 
   private static Task mockTask(final int taskIdx, final List<StageEdge> inEdges) {
@@ -81,11 +97,12 @@ private static ExecutorRepresenter mockExecutorRepresenter(final Task task) {
   @Test
   public void testScheduleSkewedTasks() {
     final SchedulingConstraint schedulingConstraint = new SkewnessAwareSchedulingConstraint();
-    final StageEdge inEdge = mockStageEdge();
-    final Task task0 = mockTask(0, Arrays.asList(inEdge));
-    final Task task1 = mockTask(1, Arrays.asList(inEdge));
-    final Task task2 = mockTask(2, Arrays.asList(inEdge));
-    final ExecutorRepresenter e0 = mockExecutorRepresenter(task0);
+    // Create a StageEdge where two out of three are skewed hash ranges.
+    final StageEdge inEdge = mockStageEdge(2, 3);
+    final Task task0 = mockTask(0, Arrays.asList(inEdge));  // skewed task
+    final Task task1 = mockTask(1, Arrays.asList(inEdge));  // skewed task
+    final Task task2 = mockTask(2, Arrays.asList(inEdge));  // non-skewed task
+    final ExecutorRepresenter e0 = mockExecutorRepresenter(task0);  // schedule skewed task to e0
 
     assertEquals(true, schedulingConstraint.testSchedulability(e0, task2));
     assertEquals(false, schedulingConstraint.testSchedulability(e0, task1));
diff --git a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
index 3b3646eb1..3c23dcfec 100644
--- a/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
+++ b/runtime/test/src/main/java/edu/snu/nemo/runtime/common/plan/TestPlanGenerator.java
@@ -95,7 +95,7 @@ private static PhysicalPlan convertIRToPhysical(final DAG<IRVertex, IREdge> irDA
                                                   final Policy policy) throws Exception {
     final DAG<IRVertex, IREdge> optimized = policy.runCompileTimeOptimization(irDAG, EMPTY_DAG_DIRECTORY);
     final DAG<Stage, StageEdge> physicalDAG = PLAN_GENERATOR.apply(optimized);
-    return new PhysicalPlan("TestPlan", physicalDAG);
+    return new PhysicalPlan("TestPlan", irDAG, physicalDAG);
   }
 
   /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services