You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by jo...@apache.org on 2019/01/21 06:05:13 UTC

[incubator-nemo] branch master updated: [NEMO-321] Fix the data skew pass metric mismatch (#188)

This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d0f781  [NEMO-321] Fix the data skew pass metric mismatch (#188)
2d0f781 is described below

commit 2d0f7819088af5a1f8ddda02431c6988cf2c9ae3
Author: Sanha Lee <sa...@gmail.com>
AuthorDate: Mon Jan 21 15:05:08 2019 +0900

    [NEMO-321] Fix the data skew pass metric mismatch (#188)
    
    JIRA: [NEMO-321: Fix the data skew pass metric mismatch](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-321)
    
    **Major changes:**
    - Makes `DataSkewRuntimePass` access to the partitioning logic by moving `Partitioner` interface and the implementations from runtime executor to runtime common.
      - The mismatch between the data metric produced by `MetricCollectionVertex` and the data metric understood by `DataSkewRuntimePass` is caused by the fact that the partitioning logic is hidden from both components.
    
    **Minor changes to note:**
    - Adds an empty edge from every metric aggregation vertex to the next stage (as a control dependency) to delay the next stage's scheduling. At now, the order of these stages' scheduling depends on luck.
    - Fix the location of `MetricCollection` property.
    - Remove the `DynamicOptimization` property (not needed).
    - Re-enable the skewness aware scheduling.
    
    **Tests for the changes:**
    - Existing integration test and unit tests.
    
    **Other comments:**
    - N/A.
    
    Closes #188
---
 .../MetricCollectionProperty.java                  |  2 +-
 .../DynamicOptimizationProperty.java               | 50 --------------
 .../apache/nemo/common/test/EmptyComponents.java   |  9 +--
 .../compiler/frontend/beam/BeamKeyExtractor.java   |  7 +-
 .../annotating/SkewMetricCollectionPass.java       | 65 ------------------
 .../annotating/SkewPartitionerPass.java            | 24 ++-----
 .../annotating/SkewResourceSkewedDataPass.java     | 51 ++++----------
 .../compiletime/composite/SkewCompositePass.java   |  4 +-
 .../pass/compiletime/reshaping/ReshapingPass.java  | 14 ++++
 .../compiletime/reshaping/SkewReshapingPass.java   | 20 ++++--
 .../compiler/optimizer/policy/DataSkewPolicy.java  |  3 +-
 .../compiler/optimizer/policy/PolicyBuilder.java   |  4 ++
 .../composite/SkewCompositePassTest.java           | 11 ++-
 .../main/java/org/apache/nemo/conf/JobConf.java    | 13 ----
 .../pass/runtime/DataSkewRuntimePass.java          | 79 +++++++++++++++-------
 .../partitioner/DataSkewHashPartitioner.java       | 11 ++-
 .../partitioner/DedicatedKeyPerElement.java        |  2 +-
 .../DedicatedKeyPerElementPartitioner.java         |  2 +-
 .../common}/partitioner/HashPartitioner.java       |  2 +-
 .../common}/partitioner/IntactPartitioner.java     |  4 +-
 .../runtime/common/partitioner/Partitioner.java}   | 46 ++++++-------
 .../runtime/common/plan/PhysicalPlanGenerator.java |  2 -
 .../pass/runtime/DataSkewRuntimePassTest.java      | 23 ++++++-
 .../runtime/common/plan/StagePartitionerTest.java  |  4 +-
 .../executor/data/partitioner/Partitioner.java     | 38 -----------
 .../executor/datatransfer/BlockOutputWriter.java   |  8 +--
 .../datatransfer/DynOptDataOutputCollector.java    |  1 +
 .../datatransfer/IntermediateDataIOFactory.java    | 12 +---
 .../executor/datatransfer/OutputWriter.java        | 42 ------------
 .../executor/datatransfer/PipeOutputWriter.java    |  8 +--
 .../runtime/master/scheduler/BatchScheduler.java   |  2 +-
 31 files changed, 195 insertions(+), 368 deletions(-)

diff --git a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java
index 5f18a35..c9749a8 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/edge/executionproperty/MetricCollectionProperty.java
@@ -21,7 +21,7 @@ package org.apache.nemo.common.ir.edge.executionproperty;
 import org.apache.nemo.common.ir.executionproperty.EdgeExecutionProperty;
 
 /**
- * MetricCollection ExecutionProperty.
+ * MetricCollection ExecutionProperty that indicates the edge of which data metric will be collected.
  */
 public final class MetricCollectionProperty extends EdgeExecutionProperty<MetricCollectionProperty.Value> {
   /**
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/DynamicOptimizationProperty.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/DynamicOptimizationProperty.java
deleted file mode 100644
index e1ec738..0000000
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/executionproperty/DynamicOptimizationProperty.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.common.ir.vertex.executionproperty;
-
-import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
-
-/**
- * DynamicOptimizationType ExecutionProperty.
- */
-public final class DynamicOptimizationProperty extends VertexExecutionProperty<DynamicOptimizationProperty.Value> {
-  /**
-   * Constructor.
-   * @param value value of the execution property.
-   */
-  private DynamicOptimizationProperty(final Value value) {
-    super(value);
-  }
-
-  /**
-   * Static method exposing the constructor.
-   * @param value value of the new execution property.
-   * @return the newly created execution property.
-   */
-  public static DynamicOptimizationProperty of(final Value value) {
-    return new DynamicOptimizationProperty(value);
-  }
-
-  /**
-   * Possible values of DynamicOptimization ExecutionProperty.
-   */
-  public enum Value {
-    DataSkewRuntimePass
-  }
-}
diff --git a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
index 1e950be..2a0fe36 100644
--- a/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/org/apache/nemo/common/test/EmptyComponents.java
@@ -26,10 +26,7 @@ import org.apache.nemo.common.dag.DAGBuilder;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.DecoderProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.EncoderProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
+import org.apache.nemo.common.ir.edge.executionproperty.*;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
@@ -98,11 +95,15 @@ public final class EmptyComponents {
     shuffleEdgeBetweenT1AndT2.setProperty(KeyExtractorProperty.of(new DummyBeamKeyExtractor()));
     shuffleEdgeBetweenT1AndT2.setProperty(EncoderProperty.of(new EncoderFactory.DummyEncoderFactory()));
     shuffleEdgeBetweenT1AndT2.setProperty(DecoderProperty.of(new DecoderFactory.DummyDecoderFactory()));
+    shuffleEdgeBetweenT1AndT2.setProperty(KeyEncoderProperty.of(new EncoderFactory.DummyEncoderFactory()));
+    shuffleEdgeBetweenT1AndT2.setProperty(KeyDecoderProperty.of(new DecoderFactory.DummyDecoderFactory()));
 
     final IREdge shuffleEdgeBetweenT3AndT4 = new IREdge(CommunicationPatternProperty.Value.Shuffle, t3, t4);
     shuffleEdgeBetweenT3AndT4.setProperty(KeyExtractorProperty.of(new DummyBeamKeyExtractor()));
     shuffleEdgeBetweenT3AndT4.setProperty(EncoderProperty.of(new EncoderFactory.DummyEncoderFactory()));
     shuffleEdgeBetweenT3AndT4.setProperty(DecoderProperty.of(new DecoderFactory.DummyDecoderFactory()));
+    shuffleEdgeBetweenT3AndT4.setProperty(KeyEncoderProperty.of(new EncoderFactory.DummyEncoderFactory()));
+    shuffleEdgeBetweenT3AndT4.setProperty(KeyDecoderProperty.of(new DecoderFactory.DummyDecoderFactory()));
 
     dagBuilder.addVertex(s);
     dagBuilder.addVertex(t1);
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
index 824c99b..8207320 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/BeamKeyExtractor.java
@@ -29,11 +29,10 @@ import org.apache.beam.sdk.values.KV;
 final class BeamKeyExtractor implements KeyExtractor {
   @Override
   public Object extractKey(final Object element) {
-    final WindowedValue windowedValue = (WindowedValue) element;
-    final Object value = windowedValue.getValue();
-    if (value instanceof KV) {
+    final Object valueToExtract = element instanceof WindowedValue ? ((WindowedValue) element).getValue() : element;
+    if (valueToExtract instanceof KV) {
       // Handle null keys, since Beam allows KV with null keys.
-      final Object key = ((KV) value).getKey();
+      final Object key = ((KV) valueToExtract).getKey();
       return key == null ? 0 : key;
     } else {
       return element;
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
deleted file mode 100644
index 0febda5..0000000
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import org.apache.nemo.common.dag.DAG;
-import org.apache.nemo.common.ir.edge.IREdge;
-import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
-import org.apache.nemo.common.ir.vertex.transform.MetricCollectTransform;
-import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
-
-/**
- * Pass to annotate the IR DAG for skew handling.
- *
- * It specifies the target of dynamic optimization for skew handling
- * by setting appropriate {@link MetricCollectionProperty} to
- * outgoing shuffle edges from vertices with {@link MetricCollectTransform}.
- */
-@Annotates(MetricCollectionProperty.class)
-@Requires(CommunicationPatternProperty.class)
-public final class SkewMetricCollectionPass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public SkewMetricCollectionPass() {
-    super(SkewMetricCollectionPass.class);
-  }
-
-  @Override
-  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
-    dag.topologicalDo(v -> {
-      // we only care about metric collection vertices.
-      if (v instanceof OperatorVertex
-        && ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform) {
-        dag.getOutgoingEdgesOf(v).forEach(edge -> {
-          // double checking.
-          if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
-              .equals(CommunicationPatternProperty.Value.Shuffle)) {
-            edge.setPropertyPermanently(MetricCollectionProperty.of(
-                MetricCollectionProperty.Value.DataSkewRuntimePass));
-          }
-        });
-      }
-    });
-    return dag;
-  }
-}
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
index b9f4705..45401e0 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
@@ -23,12 +23,8 @@ import org.apache.nemo.common.ir.edge.IREdge;
 import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
-import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
 
-import java.util.List;
-
 /**
  * Transient resource pass for tagging edges with {@link PartitionerProperty}.
  */
@@ -44,19 +40,13 @@ public final class SkewPartitionerPass extends AnnotatingPass {
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
-    dag.getVertices().forEach(v -> {
-      if (v instanceof OperatorVertex
-        && ((OperatorVertex) v).getTransform() instanceof AggregateMetricTransform) {
-        final List<IREdge> outEdges = dag.getOutgoingEdgesOf(v);
-        outEdges.forEach(edge -> {
-          // double checking.
-          if (MetricCollectionProperty.Value.DataSkewRuntimePass
-            .equals(edge.getPropertyValue(MetricCollectionProperty.class).get())) {
-            edge.setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner));
-          }
-        });
-      }
-    });
+    dag.getVertices()
+      .forEach(v -> dag.getOutgoingEdgesOf(v).stream()
+        .filter(edge -> edge.getPropertyValue(MetricCollectionProperty.class).isPresent())
+        .forEach(skewEdge -> skewEdge
+          .setPropertyPermanently(PartitionerProperty.of(PartitionerProperty.Value.DataSkewHashPartitioner))
+        )
+      );
     return dag;
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index 784ba64..425c939 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -20,22 +20,20 @@ package org.apache.nemo.compiler.optimizer.pass.compiletime.annotating;
 
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.ir.edge.IREdge;
+import org.apache.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 import org.apache.nemo.common.ir.vertex.IRVertex;
-import org.apache.nemo.common.ir.vertex.OperatorVertex;
-import org.apache.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSkewedDataProperty;
-import org.apache.nemo.common.ir.vertex.transform.MetricCollectTransform;
-
-import java.util.List;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
 
 /**
  * Pass to annotate the IR DAG for skew handling.
  *
- * It marks children and descendents of vertex with {@link MetricCollectTransform},
+ * It marks children and descendents of vertex with {@link ResourceSkewedDataProperty},
  * which collects task-level statistics used for dynamic optimization,
  * with {@link ResourceSkewedDataProperty} to perform skewness-aware scheduling.
  */
-@Annotates(DynamicOptimizationProperty.class)
+@Annotates(ResourceSkewedDataProperty.class)
+@Requires(MetricCollectionProperty.class)
 public final class SkewResourceSkewedDataPass extends AnnotatingPass {
   /**
    * Default constructor.
@@ -44,40 +42,19 @@ public final class SkewResourceSkewedDataPass extends AnnotatingPass {
     super(SkewResourceSkewedDataPass.class);
   }
 
-  /**
-   * @param dag that contains the {@code v}.
-   * @param v to inspect.
-   * @return whether or not the vertex has parent with MetricCollectTransform.
-   */
-  private boolean hasParentWithMetricCollectTransform(final DAG<IRVertex, IREdge> dag,
-                                                      final IRVertex v) {
-    List<IRVertex> parents = dag.getParents(v.getId());
-    for (IRVertex parent : parents) {
-      if (parent instanceof OperatorVertex
-        && ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
-    dag.getVertices().stream()
-        .filter(v -> v instanceof OperatorVertex
-          && ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform)
-      .forEach(v -> v.setProperty(DynamicOptimizationProperty
-            .of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
-
-    dag.getVertices().stream()
-        .filter(v -> hasParentWithMetricCollectTransform(dag, v)
-            && !v.getExecutionProperties().containsKey(ResourceSkewedDataProperty.class))
-        .forEach(childV -> {
-          childV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
-          dag.getDescendants(childV.getId()).forEach(descendentV -> {
+    dag.getVertices()
+      .forEach(v -> dag.getOutgoingEdgesOf(v).stream()
+        .filter(edge -> edge.getPropertyValue(MetricCollectionProperty.class).isPresent())
+        .forEach(skewEdge -> {
+          final IRVertex dstV = skewEdge.getDst();
+          dstV.setProperty(ResourceSkewedDataProperty.of(true));
+          dag.getDescendants(dstV.getId()).forEach(descendentV -> {
             descendentV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
           });
-        });
+        })
+      );
 
     return dag;
   }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java
index bfae788..2ca955c 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java
@@ -33,8 +33,8 @@ public final class SkewCompositePass extends CompositePass {
   public SkewCompositePass() {
     super(Arrays.asList(
         new SkewReshapingPass(),
-        new SkewResourceSkewedDataPass(),
-        new SkewMetricCollectionPass()
+        new SkewPartitionerPass(),
+        new SkewResourceSkewedDataPass()
     ));
   }
 }
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java
index dd80faa..85af2dc 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/ReshapingPass.java
@@ -21,6 +21,7 @@ package org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping;
 import org.apache.nemo.common.ir.executionproperty.ExecutionProperty;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.Annotates;
 
 import java.util.Arrays;
 import java.util.HashSet;
@@ -31,6 +32,7 @@ import java.util.Set;
  * It is ensured by the compiler that no execution properties are modified by a ReshapingPass.
  */
 public abstract class ReshapingPass extends CompileTimePass {
+  private final Set<Class<? extends ExecutionProperty>> executionPropertiesToAnnotate;
   private final Set<Class<? extends ExecutionProperty>> prerequisiteExecutionProperties;
 
   /**
@@ -41,6 +43,18 @@ public abstract class ReshapingPass extends CompileTimePass {
     final Requires requires = cls.getAnnotation(Requires.class);
     this.prerequisiteExecutionProperties = requires == null
         ? new HashSet<>() : new HashSet<>(Arrays.asList(requires.value()));
+
+    final Annotates annotates = cls.getAnnotation(Annotates.class);
+    this.executionPropertiesToAnnotate = annotates == null
+      ? new HashSet<>() : new HashSet<>(Arrays.asList(annotates.value()));
+  }
+
+  /**
+   * Getter for the execution properties to annotate through the pass.
+   * @return key of execution properties to annotate through the pass.
+   */
+  public final Set<Class<? extends ExecutionProperty>> getExecutionPropertiesToAnnotate() {
+    return executionPropertiesToAnnotate;
   }
 
   /**
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
index 0236b67..da436d9 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
@@ -35,6 +35,7 @@ import org.apache.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import org.apache.nemo.common.ir.vertex.transform.MetricCollectTransform;
 import org.apache.nemo.compiler.optimizer.PairKeyExtractor;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.Requires;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.Annotates;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +54,7 @@ import java.util.function.BiFunction;
  * 2) Stage-level statistic aggregation is done via vertex with {@link AggregateMetricTransform}
  * inserted before shuffle edges.
  * */
+@Annotates(MetricCollectionProperty.class)
 @Requires(CommunicationPatternProperty.class)
 public final class SkewReshapingPass extends ReshapingPass {
   private static final Logger LOG = LoggerFactory.getLogger(SkewReshapingPass.class.getName());
@@ -95,10 +97,17 @@ public final class SkewReshapingPass extends ReshapingPass {
             final IREdge edgeToOriginalDstV =
               new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), edge.getSrc(), v);
             edge.copyExecutionPropertiesTo(edgeToOriginalDstV);
+            edgeToOriginalDstV.setPropertyPermanently(
+              MetricCollectionProperty.of(MetricCollectionProperty.Value.DataSkewRuntimePass));
 
             builder.connectVertices(edgeToMCV);
             builder.connectVertices(edgeToABV);
             builder.connectVertices(edgeToOriginalDstV);
+
+            // Add an control dependency (no output)
+            final IREdge emptyEdge =
+              new IREdge(CommunicationPatternProperty.Value.BroadCast, abv, v);
+            builder.connectVertices(emptyEdge);
           } else {
             builder.connectVertices(edge);
           }
@@ -199,7 +208,7 @@ public final class SkewReshapingPass extends ReshapingPass {
     final IREdge newEdge = new IREdge(CommunicationPatternProperty.Value.Shuffle, mcv, abv);
     newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
     newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
-    newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+    newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Push));
     newEdge.setProperty(KeyExtractorProperty.of(new PairKeyExtractor()));
     newEdge.setProperty(AdditionalOutputTagProperty.of(ADDITIONAL_OUTPUT_TAG));
 
@@ -210,12 +219,13 @@ public final class SkewReshapingPass extends ReshapingPass {
       && edge.getPropertyValue(KeyDecoderProperty.class).isPresent()) {
       final EncoderFactory keyEncoderFactory = edge.getPropertyValue(KeyEncoderProperty.class).get();
       final DecoderFactory keyDecoderFactory = edge.getPropertyValue(KeyDecoderProperty.class).get();
-      newEdge.setProperty(EncoderProperty.of(PairEncoderFactory.of(keyEncoderFactory, LongEncoderFactory.of())));
-      newEdge.setProperty(DecoderProperty.of(PairDecoderFactory.of(keyDecoderFactory, LongDecoderFactory.of())));
+      newEdge.setPropertyPermanently(
+        EncoderProperty.of(PairEncoderFactory.of(keyEncoderFactory, LongEncoderFactory.of())));
+      newEdge.setPropertyPermanently(
+        DecoderProperty.of(PairDecoderFactory.of(keyDecoderFactory, LongDecoderFactory.of())));
     } else {
       // If not specified, follow encoder/decoder of the given shuffle edge.
-      newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
-      newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
+      throw new RuntimeException("Skew optimization request for none key - value format data!");
     }
 
     return newEdge;
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/DataSkewPolicy.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/DataSkewPolicy.java
index 2a48007..abb0bcf 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/DataSkewPolicy.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/DataSkewPolicy.java
@@ -34,8 +34,7 @@ import org.apache.reef.tang.Injector;
 public final class DataSkewPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
       new PolicyBuilder()
-        .registerRuntimePass(new DataSkewRuntimePass().setNumSkewedKeys(DataSkewRuntimePass.DEFAULT_NUM_SKEWED_KEYS),
-          new SkewCompositePass())
+        .registerRuntimePass(new DataSkewRuntimePass(), new SkewCompositePass())
         .registerCompileTimePass(new LoopOptimizationCompositePass())
         .registerCompileTimePass(new DefaultCompositePass());
 
diff --git a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilder.java b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilder.java
index fe413a2..770cf73 100644
--- a/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilder.java
+++ b/compiler/optimizer/src/main/java/org/apache/nemo/compiler/optimizer/policy/PolicyBuilder.java
@@ -32,6 +32,7 @@ import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
 import org.apache.nemo.compiler.optimizer.pass.compiletime.composite.CompositePass;
+import org.apache.nemo.compiler.optimizer.pass.compiletime.reshaping.ReshapingPass;
 import org.apache.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
 
 import java.util.*;
@@ -85,6 +86,9 @@ public final class PolicyBuilder {
     if (compileTimePass instanceof AnnotatingPass) {
       final AnnotatingPass annotatingPass = (AnnotatingPass) compileTimePass;
       this.annotatedExecutionProperties.addAll(annotatingPass.getExecutionPropertiesToAnnotate());
+    } else if (compileTimePass instanceof ReshapingPass) {
+      final ReshapingPass reshapingPass = (ReshapingPass) compileTimePass;
+      this.annotatedExecutionProperties.addAll(reshapingPass.getExecutionPropertiesToAnnotate());
     }
 
     this.compileTimePasses.add(compileTimePass);
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
index 8ef91e5..46937e9 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
@@ -101,8 +101,13 @@ public class SkewCompositePassTest {
     processedDAG.filterVertices(v -> v instanceof OperatorVertex
       && ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform)
       .forEach(metricV -> {
-          List<IRVertex> reducerV = processedDAG.getChildren(metricV.getId());
-          reducerV.forEach(rV -> assertTrue(rV.getPropertyValue(ResourceSkewedDataProperty.class).get()));
-        });
+          final List<IRVertex> reducerV = processedDAG.getChildren(metricV.getId());
+          reducerV.forEach(rV -> {
+            if (rV instanceof OperatorVertex &&
+              !(((OperatorVertex) rV).getTransform() instanceof AggregateMetricTransform)) {
+              assertTrue(rV.getPropertyValue(ResourceSkewedDataProperty.class).get());
+            }
+          });
+      });
   }
 }
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index 1bdb7c9..7b31372 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -211,19 +211,6 @@ public final class JobConf extends ConfigurationModuleBuilder {
   }
 
   /**
-   * Hash range multiplier.
-   * If we need to split or recombine an output data from a task after it is stored,
-   * we multiply the hash range with this factor in advance
-   * to prevent the extra deserialize - rehash - serialize process.
-   * In these cases, the hash range will be (hash range multiplier X destination task parallelism).
-   * The reason why we do not divide the output into a fixed number is that the fixed number can be smaller than
-   * the destination task parallelism.
-   */
-  @NamedParameter(doc = "Hash range multiplier", short_name = "hash_range_multiplier", default_value = "10")
-  public final class HashRangeMultiplier implements Name<Integer> {
-  }
-
-  /**
    * The TCP port to which local block transfer binds. 0 means random port.
    */
   @NamedParameter(doc = "Port to which PartitionTransport binds (0 means random port)",
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index 63e131d..2f6c998 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -23,10 +23,13 @@ import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.dag.DAG;
 import org.apache.nemo.common.eventhandler.RuntimeEventHandler;
 
+import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.KeyRange;
 import org.apache.nemo.common.HashRange;
 import org.apache.nemo.runtime.common.eventhandler.DynamicOptimizationEventHandler;
+import org.apache.nemo.runtime.common.partitioner.DataSkewHashPartitioner;
+import org.apache.nemo.runtime.common.partitioner.Partitioner;
 import org.apache.nemo.runtime.common.plan.PhysicalPlan;
 import org.apache.nemo.runtime.common.plan.Stage;
 import org.apache.nemo.runtime.common.plan.StageEdge;
@@ -44,29 +47,39 @@ import java.util.stream.Collectors;
  */
 public final class DataSkewRuntimePass extends RuntimePass<Pair<StageEdge, Map<Object, Long>>> {
   private static final Logger LOG = LoggerFactory.getLogger(DataSkewRuntimePass.class.getName());
+  private static final int DEFAULT_NUM_SKEWED_KEYS = 1;
+  /*
+   * Hash range multiplier.
+   * If we need to split or recombine an output data from a task after it is stored,
+   * we multiply the hash range with this factor in advance
+   * to prevent the extra deserialize - rehash - serialize process.
+   * In these cases, the hash range will be (hash range multiplier X destination task parallelism).
+   * The reason why we do not divide the output into a fixed number is that the fixed number can be smaller than
+   * the destination task parallelism.
+   */
+  public static final int HASH_RANGE_MULTIPLIER = 10;
+
   private final Set<Class<? extends RuntimeEventHandler>> eventHandlers;
   // Skewed keys denote for top n keys in terms of partition size.
-  public static final int DEFAULT_NUM_SKEWED_KEYS = 1;
-  private int numSkewedKeys;
+  private final int numSkewedKeys;
 
   /**
-   * Constructor.
+   * Constructor without expected number of skewed keys.
    */
   public DataSkewRuntimePass() {
-    this.eventHandlers = Collections.singleton(DynamicOptimizationEventHandler.class);
-    this.numSkewedKeys = DEFAULT_NUM_SKEWED_KEYS;
+    this(DEFAULT_NUM_SKEWED_KEYS);
   }
 
+  /**
+   * Constructor with expected number of skewed keys.
+   *
+   * @param numOfSkewedKeys the expected number of skewed keys.
+   */
   public DataSkewRuntimePass(final int numOfSkewedKeys) {
-    this();
+    this.eventHandlers = Collections.singleton(DynamicOptimizationEventHandler.class);
     this.numSkewedKeys = numOfSkewedKeys;
   }
 
-  public DataSkewRuntimePass setNumSkewedKeys(final int numOfSkewedKeys) {
-    numSkewedKeys = numOfSkewedKeys;
-    return this;
-  }
-
   @Override
   public Set<Class<? extends RuntimeEventHandler>> getEventHandlerClasses() {
     return this.eventHandlers;
@@ -79,9 +92,15 @@ public final class DataSkewRuntimePass extends RuntimePass<Pair<StageEdge, Map<O
     // Get number of evaluators of the next stage (number of blocks).
     final Integer dstParallelism = targetEdge.getDst().getPropertyValue(ParallelismProperty.class).
         orElseThrow(() -> new RuntimeException("No parallelism on a vertex"));
+    if (!PartitionerProperty.Value.DataSkewHashPartitioner
+      .equals(targetEdge.getPropertyValue(PartitionerProperty.class)
+        .orElseThrow(() -> new RuntimeException("No partitioner property!")))) {
+      throw new RuntimeException("Invalid partitioner is assigned to the target edge!");
+    }
+    final DataSkewHashPartitioner partitioner = (DataSkewHashPartitioner) Partitioner.getPartitioner(targetEdge);
 
     // Calculate keyRanges.
-    final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), dstParallelism);
+    final List<KeyRange> keyRanges = calculateKeyRanges(metricData.right(), dstParallelism, partitioner);
     final Map<Integer, KeyRange> taskIdxToKeyRange = new HashMap<>();
     for (int i = 0; i < dstParallelism; i++) {
       taskIdxToKeyRange.put(i, keyRanges.get(i));
@@ -129,25 +148,39 @@ public final class DataSkewRuntimePass extends RuntimePass<Pair<StageEdge, Map<O
 
   /**
    * Evenly distribute the skewed data to the destination tasks.
-   * Partition denotes for a keyed portion of a Task output, whose key is a key.
-   * Using a map of key to partition size, this method groups the given partitions
-   * to a key range of partitions with approximate size of (total size of partitions / the number of tasks).
+   * Partition denotes for a keyed portion of a Task output.
+   * Using a map of actual data key to count, this method gets the size of each the given partitions and
+   * redistribute the key range of partitions with approximate size of (total size of partitions / the number of tasks).
+   * Assumption: the returned key of the partitioner is always 0 or positive integer.
    *
-   * @param keyToPartitionSizeMap a map of key to partition size.
+   * @param keyToCountMap  a map of actual key to count.
    * @param dstParallelism the number of tasks that receive this data as input.
+   * @param partitioner    the partitioner.
    * @return the list of key ranges calculated.
    */
   @VisibleForTesting
-  public List<KeyRange> calculateKeyRanges(final Map<Object, Long> keyToPartitionSizeMap,
-                                           final Integer dstParallelism) {
-    final List<Long> partitionSizeList = new ArrayList<>();
-    keyToPartitionSizeMap.forEach((k, v) -> partitionSizeList.add(v));
+  public List<KeyRange> calculateKeyRanges(final Map<Object, Long> keyToCountMap,
+                                           final Integer dstParallelism,
+                                           final Partitioner<Integer> partitioner) {
+    final Map<Integer, Long> partitionKeyToPartitionCount = new HashMap<>();
+    int lastKey = 0;
+    // Aggregate the counts per each "partition key" assigned by Partitioner.
+
+    for (final Map.Entry<Object, Long> entry : keyToCountMap.entrySet()) {
+      final int partitionKey = partitioner.partition(entry.getKey());
+      lastKey = Math.max(lastKey, partitionKey);
+      partitionKeyToPartitionCount.compute(partitionKey,
+        (existPartitionKey, prevCount) -> (prevCount == null) ? entry.getValue() : prevCount + entry.getValue());
+    }
 
-    // Get the last index.
-    final int lastKey = partitionSizeList.size() - 1;
+    final List<Long> partitionSizeList = new ArrayList<>(lastKey + 1);
+    for (int i = 0; i <= lastKey; i++) {
+      final long countsForKey = partitionKeyToPartitionCount.getOrDefault(i, 0L);
+      partitionSizeList.add(countsForKey);
+    }
 
     // Identify skewed sizes, which is top numSkewedKeys number of keys.
-    List<Long> skewedSizes = identifySkewedKeys(partitionSizeList);
+    final List<Long> skewedSizes = identifySkewedKeys(partitionSizeList);
 
     // Calculate the ideal size for each destination task.
     final Long totalSize = partitionSizeList.stream().mapToLong(n -> n).sum(); // get total size
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DataSkewHashPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DataSkewHashPartitioner.java
similarity index 87%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DataSkewHashPartitioner.java
rename to runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DataSkewHashPartitioner.java
index a79defc..910a4b3 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DataSkewHashPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DataSkewHashPartitioner.java
@@ -16,9 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.runtime.executor.data.partitioner;
+package org.apache.nemo.runtime.common.partitioner;
 
 import org.apache.nemo.common.KeyExtractor;
+import org.apache.nemo.runtime.common.optimizer.pass.runtime.DataSkewRuntimePass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,7 +33,7 @@ import java.math.BigInteger;
  * When we need to split or recombine the output data from a task after it is stored,
  * we multiply the hash range with a multiplier, which is commonly-known by the source and destination tasks,
  * to prevent the extra deserialize - rehash - serialize process.
- * For more information, please check {@link org.apache.nemo.conf.JobConf.HashRangeMultiplier}.
+ * For more information, please check {@link DataSkewRuntimePass#HASH_RANGE_MULTIPLIER}.
  */
 public final class DataSkewHashPartitioner implements Partitioner<Integer> {
   private static final Logger LOG = LoggerFactory.getLogger(DataSkewHashPartitioner.class.getName());
@@ -43,17 +44,15 @@ public final class DataSkewHashPartitioner implements Partitioner<Integer> {
   /**
    * Constructor.
    *
-   * @param hashRangeMultiplier the hash range multiplier.
    * @param dstParallelism      the number of destination tasks.
    * @param keyExtractor        the key extractor that extracts keys from elements.
    */
-  public DataSkewHashPartitioner(final int hashRangeMultiplier,
-                                 final int dstParallelism,
+  public DataSkewHashPartitioner(final int dstParallelism,
                                  final KeyExtractor keyExtractor) {
     this.keyExtractor = keyExtractor;
     // For this hash range, please check the description of HashRangeMultiplier in JobConf.
     // For actual hash range to use, we calculate a prime number right next to the desired hash range.
-    this.hashRangeBase = new BigInteger(String.valueOf(dstParallelism * hashRangeMultiplier));
+    this.hashRangeBase = new BigInteger(String.valueOf(dstParallelism * DataSkewRuntimePass.HASH_RANGE_MULTIPLIER));
     this.hashRange = hashRangeBase.nextProbablePrime().intValue();
     LOG.info("hashRangeBase {} resulting hashRange {}", hashRangeBase, hashRange);
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElement.java
similarity index 94%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java
rename to runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElement.java
index 4667702..bf476a8 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElement.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElement.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.runtime.executor.data.partitioner;
+package org.apache.nemo.runtime.common.partitioner;
 
 import java.lang.annotation.*;
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
similarity index 96%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
rename to runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
index 43d11cf..e9504c2 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/DedicatedKeyPerElementPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/DedicatedKeyPerElementPartitioner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.runtime.executor.data.partitioner;
+package org.apache.nemo.runtime.common.partitioner;
 
 /**
  * An implementation of {@link Partitioner} which assigns a dedicated key per an output data from a task.
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/HashPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/HashPartitioner.java
similarity index 96%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/HashPartitioner.java
rename to runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/HashPartitioner.java
index 175a4f8..605146f 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/HashPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/HashPartitioner.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.runtime.executor.data.partitioner;
+package org.apache.nemo.runtime.common.partitioner;
 
 import org.apache.nemo.common.KeyExtractor;
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/IntactPartitioner.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/IntactPartitioner.java
similarity index 86%
rename from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/IntactPartitioner.java
rename to runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/IntactPartitioner.java
index b6253d2..dc8d7e6 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/IntactPartitioner.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/IntactPartitioner.java
@@ -16,11 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.runtime.executor.data.partitioner;
+package org.apache.nemo.runtime.common.partitioner;
 
 /**
  * An implementation of {@link Partitioner} which makes an output data
- * from a source task to a single {@link org.apache.nemo.runtime.executor.data.partition.Partition}.
+ * from a source task to a single partition.
  */
 public final class IntactPartitioner implements Partitioner<Integer> {
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/Partitioner.java
similarity index 72%
copy from runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
copy to runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/Partitioner.java
index 301c95a..eb99d7a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/partitioner/Partitioner.java
@@ -16,52 +16,48 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.nemo.runtime.executor.datatransfer;
+package org.apache.nemo.runtime.common.partitioner;
 
 import org.apache.nemo.common.KeyExtractor;
 import org.apache.nemo.common.exception.UnsupportedPartitionerException;
 import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
 import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
-import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.common.plan.StageEdge;
-import org.apache.nemo.runtime.executor.data.partitioner.*;
 
-import java.util.*;
+import java.io.Serializable;
 
 /**
- * Represents the output data transfer from a task.
+ * This interface represents the way of partitioning output data from a source task.
+ * It takes an element and designates key of partition to write the element,
+ * according to the number of destination tasks, the key of each element, etc.
+ *
+ * @param <K> the key type of the partition to write.
  */
-public interface OutputWriter {
-  /**
-   * Writes output element depending on the communication pattern of the edge.
-   *
-   * @param element the element to write.
-   */
-  void write(final Object element);
+public interface Partitioner<K extends Serializable> {
 
   /**
-   * Writes watermarks to all edges.
-   * @param watermark watermark
+   * Divides the output data from a task into multiple blocks.
+   *
+   * @param element the output element from a source task.
+   * @return the key of the partition in the block to write the element.
    */
-  void writeWatermark(final Watermark watermark);
+  K partition(Object element);
 
   /**
-   * @return the total written bytes.
+   * Gets appropriate partitioner for an edge.
+   *
+   * @param runtimeEdge the runtime edge.
+   * @return the partitioner for the edge.
    */
-  Optional<Long> getWrittenBytes();
-
-  void close();
-
-
-  static Partitioner getPartitioner(final RuntimeEdge runtimeEdge,
-                                    final int hashRangeMultiplier) {
+  static Partitioner getPartitioner(final RuntimeEdge runtimeEdge) {
     final StageEdge stageEdge = (StageEdge) runtimeEdge;
     final PartitionerProperty.Value partitionerPropertyValue =
       (PartitionerProperty.Value) runtimeEdge.getPropertyValueOrRuntimeException(PartitionerProperty.class);
     final int dstParallelism =
-      stageEdge.getDstIRVertex().getPropertyValue(ParallelismProperty.class).get();
+      stageEdge.getDstIRVertex().getPropertyValue(ParallelismProperty.class)
+        .orElseThrow(() -> new RuntimeException("No parallelism in edge " + runtimeEdge.getId()));
 
     final Partitioner partitioner;
     switch (partitionerPropertyValue) {
@@ -76,7 +72,7 @@ public interface OutputWriter {
       case DataSkewHashPartitioner:
         final KeyExtractor dataSkewKeyExtractor =
           (KeyExtractor) runtimeEdge.getPropertyValueOrRuntimeException(KeyExtractorProperty.class);
-        partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, dataSkewKeyExtractor);
+        partitioner = new DataSkewHashPartitioner(dstParallelism, dataSkewKeyExtractor);
         break;
       case DedicatedKeyPerElementPartitioner:
         partitioner = new DedicatedKeyPerElementPartitioner();
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index bbaf578..39f4801 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -25,7 +25,6 @@ import org.apache.nemo.common.ir.edge.executionproperty.DuplicateEdgeGroupProper
 import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
 import org.apache.nemo.common.ir.executionproperty.VertexExecutionProperty;
 import org.apache.nemo.common.ir.vertex.*;
-import org.apache.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import org.apache.nemo.conf.JobConf;
@@ -63,7 +62,6 @@ public final class PhysicalPlanGenerator implements Function<DAG<IRVertex, IREdg
                                 @Parameter(JobConf.DAGDirectory.class) final String dagDirectory) {
     this.dagDirectory = dagDirectory;
     this.stagePartitioner = stagePartitioner;
-    stagePartitioner.addIgnoredPropertyKey(DynamicOptimizationProperty.class);
   }
 
   /**
diff --git a/runtime/common/src/test/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/runtime/common/src/test/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
index 49327cd..605c5c3 100644
--- a/runtime/common/src/test/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
+++ b/runtime/common/src/test/java/org/apache/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
@@ -19,7 +19,10 @@
 package org.apache.nemo.runtime.common.optimizer.pass.runtime;
 
 import org.apache.nemo.common.HashRange;
+import org.apache.nemo.common.KeyExtractor;
 import org.apache.nemo.common.KeyRange;
+import org.apache.nemo.runtime.common.partitioner.HashPartitioner;
+import org.apache.nemo.runtime.common.partitioner.Partitioner;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -48,9 +51,11 @@ public class DataSkewRuntimePassTest {
   @Test
   public void testDataSkewDynamicOptimizationPass() {
     final Integer taskNum = 5;
+    final KeyExtractor asIsExtractor = new AsIsKeyExtractor();
+    final Partitioner partitioner = new HashPartitioner(taskNum, asIsExtractor);
 
     final List<KeyRange> keyRanges =
-        new DataSkewRuntimePass().setNumSkewedKeys(2).calculateKeyRanges(testMetricData, taskNum);
+        new DataSkewRuntimePass(2).calculateKeyRanges(testMetricData, taskNum, partitioner);
 
     // Test whether it correctly redistributed hash ranges.
     assertEquals(0, keyRanges.get(0).rangeBeginInclusive());
@@ -63,7 +68,7 @@ public class DataSkewRuntimePassTest {
     assertEquals(5, keyRanges.get(3).rangeEndExclusive());
     assertEquals(5, keyRanges.get(4).rangeBeginInclusive());
     assertEquals(5, keyRanges.get(4).rangeEndExclusive());
-  
+
     // Test whether it caught the provided skewness.
     assertEquals(false, ((HashRange)keyRanges.get(0)).isSkewed());
     assertEquals(false, ((HashRange)keyRanges.get(1)).isSkewed());
@@ -89,4 +94,18 @@ public class DataSkewRuntimePassTest {
       key++;
     }
   }
+
+  /**
+   * Custom {@link KeyExtractor} which returns the element as is.
+   */
+  private final class AsIsKeyExtractor implements KeyExtractor {
+
+    /**
+     * @see KeyExtractor#extractKey(Object).
+     */
+    @Override
+    public Object extractKey(final Object element) {
+      return element;
+    }
+  }
 }
diff --git a/runtime/common/src/test/java/org/apache/nemo/runtime/common/plan/StagePartitionerTest.java b/runtime/common/src/test/java/org/apache/nemo/runtime/common/plan/StagePartitionerTest.java
index cdfafe3..3c0cadb 100644
--- a/runtime/common/src/test/java/org/apache/nemo/runtime/common/plan/StagePartitionerTest.java
+++ b/runtime/common/src/test/java/org/apache/nemo/runtime/common/plan/StagePartitionerTest.java
@@ -49,7 +49,7 @@ public final class StagePartitionerTest {
   @Before
   public void setup() throws InjectionException {
     stagePartitioner = Tang.Factory.getTang().newInjector().getInstance(StagePartitioner.class);
-    stagePartitioner.addIgnoredPropertyKey(DynamicOptimizationProperty.class);
+    stagePartitioner.addIgnoredPropertyKey(IgnoreSchedulingTempDataReceiverProperty.class);
   }
 
   /**
@@ -151,7 +151,7 @@ public final class StagePartitionerTest {
   public void testNotSplitByIgnoredProperty() {
     final DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
     final IRVertex v0 = newVertex(1, 0,
-        Arrays.asList(DynamicOptimizationProperty.of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
+        Arrays.asList(IgnoreSchedulingTempDataReceiverProperty.of()));
     final IRVertex v1 = newVertex(1, 0, Collections.emptyList());
     dagBuilder.addVertex(v0);
     dagBuilder.addVertex(v1);
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/Partitioner.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/Partitioner.java
deleted file mode 100644
index bdaf05a..0000000
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/partitioner/Partitioner.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.nemo.runtime.executor.data.partitioner;
-
-import java.io.Serializable;
-
-/**
- * This interface represents the way of partitioning output data from a source task.
- * It takes an element and designates key of {@link org.apache.nemo.runtime.executor.data.partition.Partition}
- * to write the element, according to the number of destination tasks, the key of each element, etc.
- * @param <K> the key type of the partition to write.
- */
-public interface Partitioner<K extends Serializable> {
-
-  /**
-   * Divides the output data from a task into multiple blocks.
-   *
-   * @param element        the output element from a source task.
-   * @return the key of the partition in the block to write the element.
-   */
-   K partition(Object element);
-}
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
index 4b85087..9590911 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
@@ -26,7 +26,7 @@ import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
 import org.apache.nemo.runtime.executor.data.block.Block;
-import org.apache.nemo.runtime.executor.data.partitioner.*;
+import org.apache.nemo.runtime.common.partitioner.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,21 +53,19 @@ public final class BlockOutputWriter implements OutputWriter {
   /**
    * Constructor.
    *
-   * @param hashRangeMultiplier the {@link org.apache.nemo.conf.JobConf.HashRangeMultiplier}.
    * @param srcTaskId           the id of the source task.
    * @param dstIrVertex         the destination IR vertex.
    * @param runtimeEdge         the {@link RuntimeEdge}.
    * @param blockManagerWorker  the {@link BlockManagerWorker}.
    */
-  BlockOutputWriter(final int hashRangeMultiplier,
-                    final String srcTaskId,
+  BlockOutputWriter(final String srcTaskId,
                     final IRVertex dstIrVertex,
                     final RuntimeEdge<?> runtimeEdge,
                     final BlockManagerWorker blockManagerWorker) {
     this.runtimeEdge = runtimeEdge;
     this.dstIrVertex = dstIrVertex;
 
-    this.partitioner = OutputWriter.getPartitioner(runtimeEdge, hashRangeMultiplier);
+    this.partitioner = Partitioner.getPartitioner(runtimeEdge);
     this.blockManagerWorker = blockManagerWorker;
     this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class)
       .orElseThrow(() -> new RuntimeException("No data store property on the edge"));
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
index bbe4c72..8a5934b 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DynOptDataOutputCollector.java
@@ -61,6 +61,7 @@ public final class DynOptDataOutputCollector<O> implements OutputCollector<O> {
     aggregatedDynOptData.forEach((key, size) ->
       partitionSizeEntries.add(
         ControlMessage.PartitionSizeEntry.newBuilder()
+          // TODO #325: Add (de)serialization for non-string key types in data metric collection
           .setKey(key == null ? NULL_KEY : String.valueOf(key))
           .setSize(size)
           .build())
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
index a90cc79..e3c4a0c 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/IntermediateDataIOFactory.java
@@ -19,13 +19,11 @@
 package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import org.apache.nemo.conf.JobConf;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.common.plan.StageEdge;
 import org.apache.nemo.runtime.executor.data.BlockManagerWorker;
 import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
-import org.apache.reef.tang.annotations.Parameter;
 
 import javax.inject.Inject;
 import java.util.Optional;
@@ -36,13 +34,10 @@ import java.util.Optional;
 public final class IntermediateDataIOFactory {
   private final PipeManagerWorker pipeManagerWorker;
   private final BlockManagerWorker blockManagerWorker;
-  private final int hashRangeMultiplier;
 
   @Inject
-  private IntermediateDataIOFactory(@Parameter(JobConf.HashRangeMultiplier.class) final int hashRangeMultiplier,
-                                    final BlockManagerWorker blockManagerWorker,
+  private IntermediateDataIOFactory(final BlockManagerWorker blockManagerWorker,
                                     final PipeManagerWorker pipeManagerWorker) {
-    this.hashRangeMultiplier = hashRangeMultiplier;
     this.blockManagerWorker = blockManagerWorker;
     this.pipeManagerWorker = pipeManagerWorker;
   }
@@ -57,11 +52,10 @@ public final class IntermediateDataIOFactory {
   public OutputWriter createWriter(final String srcTaskId,
                                    final RuntimeEdge<?> runtimeEdge) {
     if (isPipe(runtimeEdge)) {
-      return new PipeOutputWriter(hashRangeMultiplier, srcTaskId, runtimeEdge, pipeManagerWorker);
+      return new PipeOutputWriter(srcTaskId, runtimeEdge, pipeManagerWorker);
     } else {
       final StageEdge stageEdge = (StageEdge) runtimeEdge;
-      return new BlockOutputWriter(
-        hashRangeMultiplier, srcTaskId, stageEdge.getDstIRVertex(), runtimeEdge, blockManagerWorker);
+      return new BlockOutputWriter(srcTaskId, stageEdge.getDstIRVertex(), runtimeEdge, blockManagerWorker);
     }
   }
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
index 301c95a..c976cf4 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -18,15 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
-import org.apache.nemo.common.KeyExtractor;
-import org.apache.nemo.common.exception.UnsupportedPartitionerException;
-import org.apache.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
-import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
-import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.punctuation.Watermark;
-import org.apache.nemo.runtime.common.plan.RuntimeEdge;
-import org.apache.nemo.runtime.common.plan.StageEdge;
-import org.apache.nemo.runtime.executor.data.partitioner.*;
 
 import java.util.*;
 
@@ -53,38 +45,4 @@ public interface OutputWriter {
   Optional<Long> getWrittenBytes();
 
   void close();
-
-
-  static Partitioner getPartitioner(final RuntimeEdge runtimeEdge,
-                                    final int hashRangeMultiplier) {
-    final StageEdge stageEdge = (StageEdge) runtimeEdge;
-    final PartitionerProperty.Value partitionerPropertyValue =
-      (PartitionerProperty.Value) runtimeEdge.getPropertyValueOrRuntimeException(PartitionerProperty.class);
-    final int dstParallelism =
-      stageEdge.getDstIRVertex().getPropertyValue(ParallelismProperty.class).get();
-
-    final Partitioner partitioner;
-    switch (partitionerPropertyValue) {
-      case IntactPartitioner:
-        partitioner = new IntactPartitioner();
-        break;
-      case HashPartitioner:
-        final KeyExtractor hashKeyExtractor =
-          (KeyExtractor) runtimeEdge.getPropertyValueOrRuntimeException(KeyExtractorProperty.class);
-        partitioner = new HashPartitioner(dstParallelism, hashKeyExtractor);
-        break;
-      case DataSkewHashPartitioner:
-        final KeyExtractor dataSkewKeyExtractor =
-          (KeyExtractor) runtimeEdge.getPropertyValueOrRuntimeException(KeyExtractorProperty.class);
-        partitioner = new DataSkewHashPartitioner(hashRangeMultiplier, dstParallelism, dataSkewKeyExtractor);
-        break;
-      case DedicatedKeyPerElementPartitioner:
-        partitioner = new DedicatedKeyPerElementPartitioner();
-        break;
-      default:
-        throw new UnsupportedPartitionerException(
-          new Throwable("Partitioner " + partitionerPropertyValue + " is not supported."));
-    }
-    return partitioner;
-  }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index f937975..c8e0dbe 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -24,7 +24,7 @@ import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.executor.bytetransfer.ByteOutputContext;
 import org.apache.nemo.runtime.executor.data.PipeManagerWorker;
-import org.apache.nemo.runtime.executor.data.partitioner.Partitioner;
+import org.apache.nemo.runtime.common.partitioner.Partitioner;
 import org.apache.nemo.runtime.executor.data.streamchainer.Serializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,20 +54,18 @@ public final class PipeOutputWriter implements OutputWriter {
   /**
    * Constructor.
    *
-   * @param hashRangeMultiplier the {@link org.apache.nemo.conf.JobConf.HashRangeMultiplier}.
    * @param srcTaskId           the id of the source task.
    * @param runtimeEdge         the {@link RuntimeEdge}.
    * @param pipeManagerWorker   the pipe manager.
    */
-  PipeOutputWriter(final int hashRangeMultiplier,
-                   final String srcTaskId,
+  PipeOutputWriter(final String srcTaskId,
                    final RuntimeEdge runtimeEdge,
                    final PipeManagerWorker pipeManagerWorker) {
     this.initialized = false;
     this.srcTaskId = srcTaskId;
     this.pipeManagerWorker = pipeManagerWorker;
     this.pipeManagerWorker.notifyMaster(runtimeEdge.getId(), RuntimeIdManager.getIndexFromTaskId(srcTaskId));
-    this.partitioner = OutputWriter.getPartitioner(runtimeEdge, hashRangeMultiplier);
+    this.partitioner = Partitioner.getPartitioner(runtimeEdge);
     this.runtimeEdge = runtimeEdge;
     this.srcTaskIndex = RuntimeIdManager.getIndexFromTaskId(srcTaskId);
   }
diff --git a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
index b80a2ce..bb76bc5 100644
--- a/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -449,7 +449,7 @@ public final class BatchScheduler implements Scheduler {
     }
 
     // Get outgoing edges of that stage with MetricCollectionProperty
-    List<StageEdge> stageEdges = planStateManager.getPhysicalPlan().getStageDAG()
+    final List<StageEdge> stageEdges = planStateManager.getPhysicalPlan().getStageDAG()
       .getOutgoingEdgesOf(parentStages.get(0));
     for (StageEdge edge : stageEdges) {
       if (edge.getExecutionProperties().containsKey(MetricCollectionProperty.class)) {