You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/09/05 05:02:24 UTC

[GitHub] sanha closed pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex

sanha closed pull request #115: [NEMO-96] Modularize DataSkewPolicy to use MetricVertex and BarrierVertex
URL: https://github.com/apache/incubator-nemo/pull/115
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/json2dot.py b/bin/json2dot.py
index f41146b64..f3caf7dd4 100755
--- a/bin/json2dot.py
+++ b/bin/json2dot.py
@@ -157,9 +157,9 @@ def dot(self):
             label += '<BR/>{}:{}'.format(transform_name, class_name)
         except:
             pass
-        if ('class' in self.properties and self.properties['class'] == 'MetricCollectionBarrierVertex'):
+        if ('class' in self.properties and self.properties['class'] == 'AggregationBarrierVertex'):
             shape = ', shape=box'
-            label += '<BR/>MetricCollectionBarrier'
+            label += '<BR/>AggregationBarrier'
         else:
             shape = ''
         try:
diff --git a/common/pom.xml b/common/pom.xml
index da5a48c80..18ef10b02 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -52,5 +52,11 @@ limitations under the License.
             <version>${hadoop.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.beam</groupId>
+          <artifactId>beam-sdks-java-core</artifactId>
+          <version>${beam.version}</version>
+        </dependency>
+
     </dependencies>
 </project>
diff --git a/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java b/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
index be7cf592c..23bc6abd3 100644
--- a/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
+++ b/common/src/main/java/edu/snu/nemo/common/KeyExtractor.java
@@ -24,6 +24,7 @@
 public interface KeyExtractor extends Serializable {
   /**
    * Extracts key.
+   *
    * @param element Element to get the key from.
    * @return The extracted key of the element.
    */
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
index dc67ff355..16fa877c5 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/DecoderFactory.java
@@ -20,8 +20,8 @@
 import java.io.Serializable;
 
 /**
- * A decoder factory object which generates decoders that decode values of type {@code T} into byte streams.
- * To avoid to generate instance-based coder such as Spark serializer for every decoding,
+ * A decoder factory object which generates decoders that decode byte streams into values of type {@code T}.
+ * To avoid generating instance-based coder such as Spark serializer for every decoding,
  * user need to instantiate a decoder instance and use it.
  *
  * @param <T> element type.
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
index d63fafb9c..82c3730c0 100644
--- a/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
+++ b/common/src/main/java/edu/snu/nemo/common/coder/EncoderFactory.java
@@ -46,7 +46,7 @@
 
     /**
      * Encodes the given value onto the specified output stream.
-     * It have to be able to encode the given stream consequently by calling this method repeatedly.
+     * It has to be able to encode the given stream consequently by calling this method repeatedly.
      * Because the user can want to keep a single output stream and continuously concatenate elements,
      * the output stream should not be closed.
      *
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java
new file mode 100644
index 000000000..4335413a4
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/LongDecoderFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * A {@link DecoderFactory} which is used for long.
+ */
+public final class LongDecoderFactory implements DecoderFactory<Long> {
+
+  private static final LongDecoderFactory LONG_DECODER_FACTORY = new LongDecoderFactory();
+
+  /**
+   * A private constructor.
+   */
+  private LongDecoderFactory() {
+    // do nothing.
+  }
+
+  /**
+   * Static initializer of the coder.
+   */
+  public static LongDecoderFactory of() {
+    return LONG_DECODER_FACTORY;
+  }
+
+  @Override
+  public Decoder<Long> create(final InputStream inputStream) {
+    return new LongDecoder(inputStream);
+  }
+
+  /**
+   * LongDecoder.
+   */
+  private final class LongDecoder implements Decoder<Long> {
+    private final DataInputStream inputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param inputStream  the input stream to decode.
+     */
+    private LongDecoder(final InputStream inputStream) {
+      // If the inputStream is closed well in upper level, it is okay to not close this stream
+      // because the DataInputStream itself will not contain any extra information.
+      // (when we close this stream, the input will be closed together.)
+      this.inputStream = new DataInputStream(inputStream);
+    }
+
+    @Override
+    public Long decode() throws IOException {
+      return inputStream.readLong();
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/coder/LongEncoderFactory.java b/common/src/main/java/edu/snu/nemo/common/coder/LongEncoderFactory.java
new file mode 100644
index 000000000..f6e5aaa32
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/coder/LongEncoderFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.coder;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A {@link EncoderFactory} which is used for long.
+ */
+public final class LongEncoderFactory implements EncoderFactory<Long> {
+
+  private static final LongEncoderFactory LONG_ENCODER_FACTORY = new LongEncoderFactory();
+
+  /**
+   * A private constructor.
+   */
+  private LongEncoderFactory() {
+    // do nothing.
+  }
+
+  /**
+   * Static initializer of the coder.
+   */
+  public static LongEncoderFactory of() {
+    return LONG_ENCODER_FACTORY;
+  }
+
+  @Override
+  public Encoder<Long> create(final OutputStream outputStream) {
+    return new LongEncoder(outputStream);
+  }
+
+  /**
+   * LongEncoder.
+   */
+  private final class LongEncoder implements Encoder<Long> {
+
+    private final DataOutputStream outputStream;
+
+    /**
+     * Constructor.
+     *
+     * @param outputStream the output stream to store the encoded bytes.
+     */
+    private LongEncoder(final OutputStream outputStream) {
+      // If the outputStream is closed well in upper level, it is okay to not close this stream
+      // because the DataOutputStream itself will not contain any extra information.
+      // (when we close this stream, the output will be closed together.)
+      this.outputStream = new DataOutputStream(outputStream);
+    }
+
+    @Override
+    public void encode(final Long value) throws IOException {
+      outputStream.writeLong(value);
+    }
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
index f9b266e56..442b7ddd0 100644
--- a/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
+++ b/common/src/main/java/edu/snu/nemo/common/dag/DAGBuilder.java
@@ -20,10 +20,7 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DataFlowProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.OperatorVertex;
-import edu.snu.nemo.common.ir.vertex.SourceVertex;
-import edu.snu.nemo.common.ir.vertex.LoopVertex;
+import edu.snu.nemo.common.ir.vertex.*;
 import edu.snu.nemo.common.exception.IllegalVertexOperationException;
 
 import java.io.Serializable;
@@ -245,7 +242,8 @@ private void sinkCheck() {
         .filter(v -> outgoingEdges.get(v).isEmpty())
         .filter(v -> v instanceof IRVertex);
     // They should either be OperatorVertex or LoopVertex
-    if (verticesToObserve.get().anyMatch(v -> !(v instanceof OperatorVertex || v instanceof LoopVertex))) {
+    if (verticesToObserve.get().anyMatch(v ->
+      !(v instanceof OperatorVertex || v instanceof LoopVertex))) {
       final String problematicVertices = verticesToObserve.get().filter(v ->
           !(v instanceof OperatorVertex || v instanceof LoopVertex))
           .map(V::getId).collect(Collectors.toList()).toString();
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyDecoderProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyDecoderProperty.java
new file mode 100644
index 000000000..0a567ec4f
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyDecoderProperty.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.edge.executionproperty;
+
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+
+/**
+ * KeyDecoder ExecutionProperty.
+ */
+public final class KeyDecoderProperty extends EdgeExecutionProperty<DecoderFactory> {
+  /**
+   * Constructor.
+   *
+   * @param value value of the execution property.
+   */
+  private KeyDecoderProperty(final DecoderFactory value) {
+    super(value);
+  }
+
+  /**
+   * Static method exposing the constructor.
+   *
+   * @param value value of the new execution property.
+   * @return the newly created execution property.
+   */
+  public static KeyDecoderProperty of(final DecoderFactory value) {
+    return new KeyDecoderProperty(value);
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyEncoderProperty.java b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyEncoderProperty.java
new file mode 100644
index 000000000..c76748439
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/edge/executionproperty/KeyEncoderProperty.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.edge.executionproperty;
+
+import edu.snu.nemo.common.coder.EncoderFactory;
+import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty;
+
+/**
+ * KeyEncoder ExecutionProperty.
+ */
+public final class KeyEncoderProperty extends EdgeExecutionProperty<EncoderFactory> {
+  /**
+   * Constructor.
+   *
+   * @param value value of the execution property.
+   */
+  private KeyEncoderProperty(final EncoderFactory value) {
+    super(value);
+  }
+
+  /**
+   * Static method exposing the constructor.
+   *
+   * @param value value of the new execution property.
+   * @return the newly created execution property.
+   */
+  public static KeyEncoderProperty of(final EncoderFactory value) {
+    return new KeyEncoderProperty(value);
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
deleted file mode 100644
index a4a03bf3e..000000000
--- a/common/src/main/java/edu/snu/nemo/common/ir/vertex/MetricCollectionBarrierVertex.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.common.ir.vertex;
-
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.exception.DynamicOptimizationException;
-import edu.snu.nemo.common.ir.edge.IREdge;
-
-import java.util.*;
-
-/**
- * IRVertex that collects statistics to send them to the optimizer for dynamic optimization.
- * This class is generated in the DAG through
- * {edu.snu.nemo.compiler.optimizer.pass.compiletime.composite.DataSkewCompositePass}.
- * @param <K> type of the key of metric data.
- * @param <V> type of the value of metric data.
- */
-public final class MetricCollectionBarrierVertex<K, V> extends IRVertex {
-  // Metric data used for dynamic optimization.
-  private Map<K, V> metricData;
-  private final List<String> blockIds;
-
-  // This DAG snapshot is taken at the end of the DataSkewCompositePass, for the vertex to know the state of the DAG at
-  // its optimization, and to be able to figure out exactly where in the DAG the vertex exists.
-  private DAG<IRVertex, IREdge> dagSnapshot;
-
-  /**
-   * Constructor for dynamic optimization vertex.
-   */
-  public MetricCollectionBarrierVertex() {
-    super();
-    this.metricData = new HashMap<>();
-    this.blockIds = new ArrayList<>();
-    this.dagSnapshot = null;
-  }
-
-  /**
-   * Constructor for dynamic optimization vertex.
-   *
-   * @param that the source object for copying
-   */
-  public MetricCollectionBarrierVertex(final MetricCollectionBarrierVertex<K, V> that) {
-    super(that);
-    this.metricData = new HashMap<>();
-    that.metricData.forEach(this.metricData::put);
-    this.blockIds = new ArrayList<>();
-    that.blockIds.forEach(this.blockIds::add);
-    this.dagSnapshot = that.dagSnapshot;
-  }
-
-  @Override
-  public MetricCollectionBarrierVertex getClone() {
-    return new MetricCollectionBarrierVertex(this);
-  }
-
-  /**
-   * This is to set the DAG snapshot at the end of the DataSkewCompositePass.
-   * @param dag DAG to set on the vertex.
-   */
-  public void setDAGSnapshot(final DAG<IRVertex, IREdge> dag) {
-    this.dagSnapshot = dag;
-  }
-
-  /**
-   * Access the DAG snapshot when triggering dynamic optimization.
-   * @return the DAG set to the vertex, or throws an exception otherwise.
-   */
-  public DAG<IRVertex, IREdge> getDAGSnapshot() {
-    if (this.dagSnapshot == null) {
-      throw new DynamicOptimizationException("MetricCollectionBarrierVertex must have been set with a DAG.");
-    }
-    return this.dagSnapshot;
-  }
-
-  /**
-   * Method for accumulating metrics in the vertex.
-   * @param metric map of hash value of the key of the block to the block size.
-   */
-  public void setMetricData(final Map<K, V> metric) {
-    metricData = metric;
-  }
-
-  /**
-   * Method for retrieving metrics from the vertex.
-   * @return the accumulated metric data.
-   */
-  public Map<K, V> getMetricData() {
-    return metricData;
-  }
-
-  /**
-   * Add block id that is needed for optimization in RuntimePass.
-   * @param blockId the block id subjected to the optimization.
-   */
-  public void addBlockId(final String blockId) {
-    blockIds.add(blockId);
-  }
-
-  /**
-   * Retrieve block ids.
-   * @return the block ids subjected to optimization.
-   */
-  public List<String> getBlockIds() {
-    return blockIds;
-  }
-
-  @Override
-  public String propertiesToJSON() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("{");
-    sb.append(irVertexPropertiesToString());
-    sb.append("}");
-    return sb.toString();
-  }
-}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/AggregateMetricTransform.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
new file mode 100644
index 000000000..8b81728c0
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/AggregateMetricTransform.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.transform;
+
+import edu.snu.nemo.common.ir.OutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.BiFunction;
+
+/**
+ * A {@link Transform} that aggregates stage-level statistics sent to the master side optimizer
+ * for dynamic optimization.
+ *
+ * @param <I> input type.
+ * @param <O> output type.
+ */
+public final class AggregateMetricTransform<I, O> implements Transform<I, O> {
+  private static final Logger LOG = LoggerFactory.getLogger(AggregateMetricTransform.class.getName());
+  private OutputCollector<O> outputCollector;
+  private O aggregatedDynOptData;
+  private final BiFunction<Object, O, O> dynOptDataAggregator;
+
+  /**
+   * Default constructor.
+   */
+  public AggregateMetricTransform(final O aggregatedDynOptData,
+                                  final BiFunction<Object, O, O> dynOptDataAggregator) {
+    this.aggregatedDynOptData = aggregatedDynOptData;
+    this.dynOptDataAggregator = dynOptDataAggregator;
+  }
+
+  @Override
+  public void prepare(final Context context, final OutputCollector<O> oc) {
+    this.outputCollector = oc;
+  }
+
+  @Override
+  public void onData(final I element) {
+    aggregatedDynOptData = dynOptDataAggregator.apply(element, aggregatedDynOptData);
+  }
+
+  @Override
+  public void close() {
+    outputCollector.emit(aggregatedDynOptData);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append(AggregateMetricTransform.class);
+    sb.append(":");
+    sb.append(super.toString());
+    return sb.toString();
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/MetricCollectTransform.java b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/MetricCollectTransform.java
new file mode 100644
index 000000000..c696d23a4
--- /dev/null
+++ b/common/src/main/java/edu/snu/nemo/common/ir/vertex/transform/MetricCollectTransform.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.common.ir.vertex.transform;
+
+import edu.snu.nemo.common.ir.OutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.function.BiFunction;
+
+/**
+ * A {@link Transform} that collects task-level statistics used for dynamic optimization.
+ * The collected statistics is sent to vertex with {@link AggregateMetricTransform} as a tagged output
+ * when this transform is closed.
+ *
+ * @param <I> input type.
+ * @param <O> output type.
+ */
+public final class MetricCollectTransform<I, O> implements Transform<I, O> {
+  private static final Logger LOG = LoggerFactory.getLogger(MetricCollectTransform.class.getName());
+  private OutputCollector<O> outputCollector;
+  private O dynOptData;
+  private final BiFunction<Object, O, O> dynOptDataCollector;
+  private final BiFunction<O, OutputCollector, O> closer;
+
+  /**
+   * MetricCollectTransform constructor.
+   */
+  public MetricCollectTransform(final O dynOptData,
+                                final BiFunction<Object, O, O> dynOptDataCollector,
+                                final BiFunction<O, OutputCollector, O> closer) {
+    this.dynOptData = dynOptData;
+    this.dynOptDataCollector = dynOptDataCollector;
+    this.closer = closer;
+  }
+
+  @Override
+  public void prepare(final Context context, final OutputCollector<O> oc) {
+    this.outputCollector = oc;
+  }
+
+  @Override
+  public void onData(final I element) {
+    dynOptData = dynOptDataCollector.apply(element, dynOptData);
+  }
+
+  @Override
+  public void close() {
+    closer.apply(dynOptData, outputCollector);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append(MetricCollectTransform.class);
+    sb.append(":");
+    sb.append(super.toString());
+    return sb.toString();
+  }
+}
diff --git a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
index 9c02ffd82..e1ae497da 100644
--- a/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
+++ b/common/src/main/java/edu/snu/nemo/common/test/EmptyComponents.java
@@ -15,16 +15,23 @@
  */
 package edu.snu.nemo.common.test;
 
+import edu.snu.nemo.common.KeyExtractor;
+import edu.snu.nemo.common.coder.DecoderFactory;
+import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.Readable;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
+import edu.snu.nemo.common.ir.edge.executionproperty.KeyExtractorProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.SourceVertex;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
+import org.apache.beam.sdk.values.KV;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -38,6 +45,10 @@
   private EmptyComponents() {
   }
 
+  /**
+   * Builds dummy IR DAG for testing.
+   * @return the dummy IR DAG.
+   */
   public static DAG<IRVertex, IREdge> buildEmptyDAG() {
     DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
     final IRVertex s = new EmptyComponents.EmptySourceVertex<>("s");
@@ -60,6 +71,61 @@ private EmptyComponents() {
     return dagBuilder.build();
   }
 
+  /**
+   * Builds dummy IR DAG to test skew handling.
+   * For DataSkewPolicy, shuffle edges needs extra setting for EncoderProperty, DecoderProperty
+   * and KeyExtractorProperty by default.
+   * @return the dummy IR DAG.
+   */
+  public static DAG<IRVertex, IREdge> buildEmptyDAGForSkew() {
+    DAGBuilder<IRVertex, IREdge> dagBuilder = new DAGBuilder<>();
+    final IRVertex s = new EmptyComponents.EmptySourceVertex<>("s");
+    final IRVertex t1 = new OperatorVertex(new EmptyComponents.EmptyTransform("t1"));
+    final IRVertex t2 = new OperatorVertex(new EmptyComponents.EmptyTransform("t2"));
+    final IRVertex t3 = new OperatorVertex(new EmptyComponents.EmptyTransform("t3"));
+    final IRVertex t4 = new OperatorVertex(new EmptyComponents.EmptyTransform("t4"));
+    final IRVertex t5 = new OperatorVertex(new EmptyComponents.EmptyTransform("t5"));
+
+    final IREdge shuffleEdgeBetweenT1AndT2 = new IREdge(CommunicationPatternProperty.Value.Shuffle, t1, t2);
+    shuffleEdgeBetweenT1AndT2.setProperty(KeyExtractorProperty.of(new DummyBeamKeyExtractor()));
+    shuffleEdgeBetweenT1AndT2.setProperty(EncoderProperty.of(new EncoderFactory.DummyEncoderFactory()));
+    shuffleEdgeBetweenT1AndT2.setProperty(DecoderProperty.of(new DecoderFactory.DummyDecoderFactory()));
+
+    final IREdge shuffleEdgeBetweenT3AndT4 = new IREdge(CommunicationPatternProperty.Value.Shuffle, t3, t4);
+    shuffleEdgeBetweenT3AndT4.setProperty(KeyExtractorProperty.of(new DummyBeamKeyExtractor()));
+    shuffleEdgeBetweenT3AndT4.setProperty(EncoderProperty.of(new EncoderFactory.DummyEncoderFactory()));
+    shuffleEdgeBetweenT3AndT4.setProperty(DecoderProperty.of(new DecoderFactory.DummyDecoderFactory()));
+
+    dagBuilder.addVertex(s);
+    dagBuilder.addVertex(t1);
+    dagBuilder.addVertex(t2);
+    dagBuilder.addVertex(t3);
+    dagBuilder.addVertex(t4);
+    dagBuilder.addVertex(t5);
+    dagBuilder.connectVertices(new IREdge(CommunicationPatternProperty.Value.OneToOne, s, t1));
+    dagBuilder.connectVertices(shuffleEdgeBetweenT1AndT2);
+    dagBuilder.connectVertices(new IREdge(CommunicationPatternProperty.Value.OneToOne, t2, t3));
+    dagBuilder.connectVertices(shuffleEdgeBetweenT3AndT4);
+    dagBuilder.connectVertices(new IREdge(CommunicationPatternProperty.Value.OneToOne, t2, t5));
+    return dagBuilder.build();
+  }
+
+  /**
+   * Dummy beam key extractor.
+   **/
+  static class DummyBeamKeyExtractor implements KeyExtractor {
+    @Override
+    public Object extractKey(final Object element) {
+      if (element instanceof KV) {
+        // Handle null keys, since Beam allows KV with null keys.
+        final Object key = ((KV) element).getKey();
+        return key == null ? 0 : key;
+      } else {
+        return element;
+      }
+    }
+  }
+
   /**
    * An empty transform.
    *
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/PipelineTranslator.java
index a744ae89e..d334d1513 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -430,15 +430,25 @@ private void addEdgeTo(final IRVertex dst, final PValue input) {
         throw new RuntimeException(String.format("While adding an edge from %s, to %s, coder for PValue %s cannot "
             + "be determined", src, dst, input));
       }
+
+      edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
+
+      if (coder instanceof KvCoder) {
+        Coder keyCoder = ((KvCoder) coder).getKeyCoder();
+        edge.setProperty(KeyEncoderProperty.of(new BeamEncoderFactory(keyCoder)));
+        edge.setProperty(KeyDecoderProperty.of(new BeamDecoderFactory(keyCoder)));
+      }
       edge.setProperty(EncoderProperty.of(new BeamEncoderFactory<>(coder)));
       edge.setProperty(DecoderProperty.of(new BeamDecoderFactory<>(coder)));
+
       if (pValueToTag.containsKey(input)) {
         edge.setProperty(AdditionalOutputTagProperty.of(pValueToTag.get(input).getId()));
       }
+
       if (input instanceof PCollectionView) {
         edge.setProperty(BroadcastVariableIdProperty.of((PCollectionView) input));
       }
-      edge.setProperty(KeyExtractorProperty.of(new BeamKeyExtractor()));
+
       builder.connectVertices(edge);
     }
 
diff --git a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
index 7ebea3865..f2aedaab2 100644
--- a/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
+++ b/compiler/frontend/beam/src/main/java/edu/snu/nemo/compiler/frontend/beam/coder/BeamDecoderFactory.java
@@ -19,15 +19,19 @@
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 /**
  * {@link DecoderFactory} from {@link org.apache.beam.sdk.coders.Coder}.
- * @param <T> the type of element to encode.
+ * @param <T> the type of element to decode.
  */
 public final class BeamDecoderFactory<T> implements DecoderFactory<T> {
+  private static final Logger LOG = LoggerFactory.getLogger(BeamDecoderFactory.class);
+
   private final Coder<T> beamCoder;
 
   /**
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
index 14f764a9e..e4f02a387 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/PairRDDFunctions.scala
@@ -19,7 +19,7 @@ import java.util
 
 import edu.snu.nemo.common.dag.DAGBuilder
 import edu.snu.nemo.common.ir.edge.IREdge
-import edu.snu.nemo.common.ir.edge.executionproperty.{DecoderProperty, EncoderProperty, KeyExtractorProperty}
+import edu.snu.nemo.common.ir.edge.executionproperty._
 import edu.snu.nemo.common.ir.executionproperty.EdgeExecutionProperty
 import edu.snu.nemo.common.ir.vertex.{IRVertex, LoopVertex, OperatorVertex}
 import edu.snu.nemo.compiler.frontend.spark.SparkKeyExtractor
@@ -77,6 +77,10 @@ final class PairRDDFunctions[K: ClassTag, V: ClassTag] protected[rdd] (
     newEdge.setProperty(
       DecoderProperty.of(new SparkDecoderFactory[Tuple2[K, V]](self.serializer))
         .asInstanceOf[EdgeExecutionProperty[_ <: Serializable]])
+    // For Tuple2 type data, set KeyEn(De)coderFactoryProperty
+    // in case it is subjected to dynamic optimization.
+    newEdge.setProperty(KeyEncoderProperty.of(new SparkEncoderFactory[K](self.serializer)))
+    newEdge.setProperty(KeyDecoderProperty.of(new SparkDecoderFactory[K](self.serializer)))
     newEdge.setProperty(KeyExtractorProperty.of(new SparkKeyExtractor))
     builder.connectVertices(newEdge)
 
diff --git a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
index c253f170e..abcebc837 100644
--- a/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
+++ b/compiler/frontend/spark/src/main/scala/edu/snu/nemo/compiler/frontend/spark/core/rdd/RDD.scala
@@ -30,7 +30,6 @@ import edu.snu.nemo.compiler.frontend.spark.{SparkBroadcastVariables, SparkKeyEx
 import edu.snu.nemo.compiler.frontend.spark.coder.{SparkDecoderFactory, SparkEncoderFactory}
 import edu.snu.nemo.compiler.frontend.spark.core.SparkFrontendUtils
 import edu.snu.nemo.compiler.frontend.spark.transform._
-import org.apache.commons.lang.SerializationUtils
 import org.apache.hadoop.io.WritableFactory
 import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.spark.api.java.function.{FlatMapFunction, Function, Function2}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
deleted file mode 100644
index 984e8dfd0..000000000
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewDataStorePass.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating;
-
-import edu.snu.nemo.common.dag.DAG;
-import edu.snu.nemo.common.ir.edge.IREdge;
-import edu.snu.nemo.common.ir.edge.executionproperty.DataStoreProperty;
-import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
-
-/**
- * Pass to annotate the DAG for a job to perform data skew.
- * It specifies the incoming one-to-one edges to MetricCollectionVertices to have either MemoryStore or LocalFileStore
- * as its DataStore ExecutionProperty.
- */
-@Annotates(DataStoreProperty.class)
-public final class SkewDataStorePass extends AnnotatingPass {
-  /**
-   * Default constructor.
-   */
-  public SkewDataStorePass() {
-    super(SkewDataStorePass.class);
-  }
-
-  @Override
-  public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
-    dag.topologicalDo(v -> {
-      // we only care about metric collection barrier vertices.
-      if (v instanceof MetricCollectionBarrierVertex) {
-        // We use memory for just a single inEdge, to make use of locality of stages: {@link PhysicalPlanGenerator}.
-        final IREdge edgeToUseMemory = dag.getIncomingEdgesOf(v).stream().findFirst().orElseThrow(() ->
-            new RuntimeException("This MetricCollectionBarrierVertex doesn't have any incoming edges: " + v.getId()));
-        dag.getIncomingEdgesOf(v).forEach(edge -> {
-          // we want it to be in the same stage
-          if (edge.equals(edgeToUseMemory)) {
-            edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.MemoryStore));
-          } else {
-            edge.setPropertyPermanently(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
-          }
-        });
-      }
-    });
-    return dag;
-  }
-}
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
index ad635c6c3..95f1cd0e3 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewMetricCollectionPass.java
@@ -19,14 +19,17 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.transform.MetricCollectTransform;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.Requires;
 
 /**
- * Pass to annotate the DAG for a job to perform data skew.
- * It specifies the outgoing Shuffle edges from MetricCollectionVertices with a MetricCollection ExecutionProperty
- * which lets the edge to know what metric collection it should perform.
+ * Pass to annotate the IR DAG for skew handling.
+ *
+ * It specifies the target of dynamic optimization for skew handling
+ * by setting appropriate {@link MetricCollectionProperty} to
+ * outgoing shuffle edges from vertices with {@link MetricCollectTransform}.
  */
 @Annotates(MetricCollectionProperty.class)
 @Requires(CommunicationPatternProperty.class)
@@ -41,8 +44,9 @@ public SkewMetricCollectionPass() {
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.topologicalDo(v -> {
-      // we only care about metric collection barrier vertices.
-      if (v instanceof MetricCollectionBarrierVertex) {
+      // we only care about metric collection vertices.
+      if (v instanceof OperatorVertex
+        && ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform) {
         dag.getOutgoingEdgesOf(v).forEach(edge -> {
           // double checking.
           if (edge.getPropertyValue(CommunicationPatternProperty.class).get()
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
index 7ffd221b9..ab34ffffb 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewPartitionerPass.java
@@ -19,8 +19,9 @@
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.Requires;
 
 import java.util.List;
@@ -40,9 +41,10 @@ public SkewPartitionerPass() {
 
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
-    dag.getVertices().forEach(vertex -> {
-      if (vertex instanceof MetricCollectionBarrierVertex) {
-        final List<IREdge> outEdges = dag.getOutgoingEdgesOf(vertex);
+    dag.getVertices().forEach(v -> {
+      if (v instanceof OperatorVertex
+        && ((OperatorVertex) v).getTransform() instanceof AggregateMetricTransform) {
+        final List<IREdge> outEdges = dag.getOutgoingEdgesOf(v);
         outEdges.forEach(edge -> {
           // double checking.
           if (MetricCollectionProperty.Value.DataSkewRuntimePass
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
index c3739aff1..806cef186 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/annotating/SkewResourceSkewedDataPass.java
@@ -18,15 +18,19 @@
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.ir.vertex.executionproperty.DynamicOptimizationProperty;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSkewedDataProperty;
+import edu.snu.nemo.common.ir.vertex.transform.MetricCollectTransform;
 
 import java.util.List;
 
 /**
- * Pass to annotate the DAG for a job to perform data skew.
- * It specifies which optimization to perform on the MetricCollectionBarrierVertex.
+ * Pass to annotate the IR DAG for skew handling.
+ *
+ * It marks children and descendents of vertex with {@link MetricCollectTransform},
+ * which collects task-level statistics used for dynamic optimization,
+ * with {@link ResourceSkewedDataProperty} to perform skewness-aware scheduling.
  */
 @Annotates(DynamicOptimizationProperty.class)
 public final class SkewResourceSkewedDataPass extends AnnotatingPass {
@@ -37,11 +41,12 @@ public SkewResourceSkewedDataPass() {
     super(SkewResourceSkewedDataPass.class);
   }
 
-  private boolean hasMetricCollectionBarrierVertexAsParent(final DAG<IRVertex, IREdge> dag,
-                                                           final IRVertex v) {
+  private boolean hasParentWithMetricCollectTransform(final DAG<IRVertex, IREdge> dag,
+                                                      final IRVertex v) {
     List<IRVertex> parents = dag.getParents(v.getId());
     for (IRVertex parent : parents) {
-      if (parent instanceof MetricCollectionBarrierVertex) {
+      if (parent instanceof OperatorVertex
+        && ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform) {
         return true;
       }
     }
@@ -51,12 +56,13 @@ private boolean hasMetricCollectionBarrierVertexAsParent(final DAG<IRVertex, IRE
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     dag.getVertices().stream()
-        .filter(v -> v instanceof MetricCollectionBarrierVertex)
-        .forEach(v -> v.setProperty(DynamicOptimizationProperty
+        .filter(v -> v instanceof OperatorVertex
+          && ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform)
+      .forEach(v -> v.setProperty(DynamicOptimizationProperty
             .of(DynamicOptimizationProperty.Value.DataSkewRuntimePass)));
 
     dag.getVertices().stream()
-        .filter(v -> hasMetricCollectionBarrierVertexAsParent(dag, v)
+        .filter(v -> hasParentWithMetricCollectTransform(dag, v)
             && !v.getExecutionProperties().containsKey(ResourceSkewedDataProperty.class))
         .forEach(childV -> {
           childV.getExecutionProperties().put(ResourceSkewedDataProperty.of(true));
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java
index 5e3d8fd1e..1be97fabb 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePass.java
@@ -22,11 +22,6 @@
 
 /**
  * Pass to modify the DAG for a job to perform data skew.
- * It adds a {@link edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex} before Shuffle edges,
- * to make a barrier before it, and to use the metrics to repartition the skewed data.
- * NOTE: we currently put the SkewCompositePass at the end of the list for each policies, as it needs to take a
- * snapshot at the end of the pass. This could be prevented by modifying other passes to take the snapshot of the DAG
- * at the end of each passes for metricCollectionVertices.
  */
 public final class SkewCompositePass extends CompositePass {
   /**
@@ -36,9 +31,7 @@ public SkewCompositePass() {
     super(Arrays.asList(
         new SkewReshapingPass(),
         new SkewResourceSkewedDataPass(),
-        new SkewDataStorePass(),
-        new SkewMetricCollectionPass(),
-        new SkewPartitionerPass()
+        new SkewMetricCollectionPass()
     ));
   }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
index 571476ddf..fe12b950c 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/reshaping/SkewReshapingPass.java
@@ -15,30 +15,44 @@
  */
 package edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping;
 
+import edu.snu.nemo.common.KeyExtractor;
+import edu.snu.nemo.common.Pair;
+import edu.snu.nemo.common.coder.*;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
+import edu.snu.nemo.common.ir.OutputCollector;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.*;
 import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.DecoderProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.EncoderProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.transform.AggregateMetricTransform;
+import edu.snu.nemo.common.ir.vertex.transform.MetricCollectTransform;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.Requires;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
 
 /**
- * Pass to modify the DAG for a job to perform data skew.
- * It adds a {@link MetricCollectionBarrierVertex} before Shuffle edges, to make a barrier before it,
- * and to use the metrics to repartition the skewed data.
- * NOTE: we currently put the SkewCompositePass at the end of the list for each policies, as it needs to take
- * a snapshot at the end of the pass. This could be prevented by modifying other passes to take the snapshot of the
- * DAG at the end of each passes for metricCollectionVertices.
- */
+ * Pass to reshape the IR DAG for skew handling.
+ *
+ * This pass inserts vertices to perform two-step dynamic optimization for skew handling.
+ * 1) Task-level statistic collection is done via vertex with {@link MetricCollectTransform}
+ * 2) Stage-level statistic aggregation is done via vertex with {@link AggregateMetricTransform}
+ * inserted before shuffle edges.
+ * */
 @Requires(CommunicationPatternProperty.class)
 public final class SkewReshapingPass extends ReshapingPass {
+  private static final Logger LOG = LoggerFactory.getLogger(SkewReshapingPass.class.getName());
+
   /**
    * Default constructor.
    */
@@ -49,33 +63,38 @@ public SkewReshapingPass() {
   @Override
   public DAG<IRVertex, IREdge> apply(final DAG<IRVertex, IREdge> dag) {
     final DAGBuilder<IRVertex, IREdge> builder = new DAGBuilder<>();
-    final List<MetricCollectionBarrierVertex> metricCollectionVertices = new ArrayList<>();
+    final List<OperatorVertex> metricCollectVertices = new ArrayList<>();
 
     dag.topologicalDo(v -> {
-      // We care about OperatorVertices that have any incoming edges that are of type Shuffle.
+      // We care about OperatorVertices that have shuffle incoming edges with main output.
+      // TODO #210: Data-aware dynamic optimization at run-time
       if (v instanceof OperatorVertex && dag.getIncomingEdgesOf(v).stream().anyMatch(irEdge ->
           CommunicationPatternProperty.Value.Shuffle
-          .equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get()))) {
-        final MetricCollectionBarrierVertex<Integer, Long> metricCollectionBarrierVertex
-            = new MetricCollectionBarrierVertex<>();
-        metricCollectionVertices.add(metricCollectionBarrierVertex);
-        builder.addVertex(v);
-        builder.addVertex(metricCollectionBarrierVertex);
+          .equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get()))
+        && dag.getIncomingEdgesOf(v).stream().noneMatch(irEdge ->
+      irEdge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent())) {
+
         dag.getIncomingEdgesOf(v).forEach(edge -> {
-          // we insert the metric collection vertex when we meet a shuffle edge
           if (CommunicationPatternProperty.Value.Shuffle
                 .equals(edge.getPropertyValue(CommunicationPatternProperty.class).get())) {
-            // We then insert the dynamicOptimizationVertex between the vertex and incoming vertices.
-            final IREdge newEdge = new IREdge(CommunicationPatternProperty.Value.OneToOne,
-                edge.getSrc(), metricCollectionBarrierVertex);
-            newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
-            newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
-
-            final IREdge edgeToGbK = new IREdge(
-              edge.getPropertyValue(CommunicationPatternProperty.class).get(), metricCollectionBarrierVertex, v);
-            edge.copyExecutionPropertiesTo(edgeToGbK);
-            builder.connectVertices(newEdge);
-            builder.connectVertices(edgeToGbK);
+            final OperatorVertex abv = generateMetricAggregationVertex();
+            final OperatorVertex mcv = generateMetricCollectVertex(edge, abv);
+            metricCollectVertices.add(mcv);
+            builder.addVertex(v);
+            builder.addVertex(mcv);
+            builder.addVertex(abv);
+
+            // We then insert the vertex with MetricCollectTransform and vertex with AggregateMetricTransform
+            // between the vertex and incoming vertices.
+            final IREdge edgeToMCV = generateEdgeToMCV(edge, mcv);
+            final IREdge edgeToABV = generateEdgeToABV(edge, mcv, abv);
+            final IREdge edgeToOriginalDstV =
+              new IREdge(edge.getPropertyValue(CommunicationPatternProperty.class).get(), edge.getSrc(), v);
+            edge.copyExecutionPropertiesTo(edgeToOriginalDstV);
+
+            builder.connectVertices(edgeToMCV);
+            builder.connectVertices(edgeToABV);
+            builder.connectVertices(edgeToOriginalDstV);
           } else {
             builder.connectVertices(edge);
           }
@@ -86,7 +105,97 @@ public SkewReshapingPass() {
       }
     });
     final DAG<IRVertex, IREdge> newDAG = builder.build();
-    metricCollectionVertices.forEach(v -> v.setDAGSnapshot(newDAG));
     return newDAG;
   }
+
+  private OperatorVertex generateMetricAggregationVertex() {
+    // Define a custom data aggregator for skew handling.
+    // Here, the aggregator gathers key frequency data used in shuffle data repartitioning.
+    final BiFunction<Object, Map<Object, Long>, Map<Object, Long>> dynOptDataAggregator =
+      (BiFunction<Object, Map<Object, Long>, Map<Object, Long>> & Serializable)
+      (element, aggregatedDynOptData) -> {
+        final Object key = ((Pair<Object, Long>) element).left();
+        final Long count = ((Pair<Object, Long>) element).right();
+
+        final Map<Object, Long> aggregatedDynOptDataMap = (Map<Object, Long>) aggregatedDynOptData;
+        if (aggregatedDynOptDataMap.containsKey(key)) {
+          aggregatedDynOptDataMap.compute(key, (existingKey, accumulatedCount) -> accumulatedCount + count);
+        } else {
+          aggregatedDynOptDataMap.put(key, count);
+        }
+        return aggregatedDynOptData;
+      };
+    final AggregateMetricTransform abt =
+      new AggregateMetricTransform<Pair<Object, Long>, Map<Object, Long>>(new HashMap<>(), dynOptDataAggregator);
+    return new OperatorVertex(abt);
+  }
+
+  private OperatorVertex generateMetricCollectVertex(final IREdge edge, final OperatorVertex abv) {
+    final KeyExtractor keyExtractor = edge.getPropertyValue(KeyExtractorProperty.class).get();
+    // Define a custom data collector for skew handling.
+    // Here, the collector gathers key frequency data used in shuffle data repartitioning.
+    final BiFunction<Object, Map<Object, Object>, Map<Object, Object>> dynOptDataCollector =
+      (BiFunction<Object, Map<Object, Object>, Map<Object, Object>> & Serializable)
+        (element, dynOptData) -> {
+          Object key = keyExtractor.extractKey(element);
+          if (dynOptData.containsKey(key)) {
+            dynOptData.compute(key, (existingKey, existingCount) -> (long) existingCount + 1L);
+          } else {
+            dynOptData.put(key, 1L);
+          }
+          return dynOptData;
+        };
+
+    // Define a custom transform closer for skew handling.
+    // Here, we emit key to frequency data map type data when closing transform.
+    final BiFunction<Map<Object, Object>, OutputCollector, Map<Object, Object>> closer =
+      (BiFunction<Map<Object, Object>, OutputCollector, Map<Object, Object>> & Serializable)
+        (dynOptData, outputCollector)-> {
+          dynOptData.forEach((k, v) -> {
+            final Pair<Object, Object> pairData = Pair.of(k, v);
+            outputCollector.emit(abv.getId(), pairData);
+          });
+          return dynOptData;
+        };
+
+    final MetricCollectTransform mct
+      = new MetricCollectTransform(new HashMap<>(), dynOptDataCollector, closer);
+    return new OperatorVertex(mct);
+  }
+
+  private IREdge generateEdgeToMCV(final IREdge edge, final OperatorVertex mcv) {
+    final IREdge newEdge =
+      new IREdge(CommunicationPatternProperty.Value.OneToOne, edge.getSrc(), mcv);
+    newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
+    newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
+    return newEdge;
+  }
+
+  private IREdge generateEdgeToABV(final IREdge edge,
+                                   final OperatorVertex mcv,
+                                   final OperatorVertex abv) {
+    final IREdge newEdge = new IREdge(CommunicationPatternProperty.Value.Shuffle, mcv, abv);
+    newEdge.setProperty(DataStoreProperty.of(DataStoreProperty.Value.LocalFileStore));
+    newEdge.setProperty(DataPersistenceProperty.of(DataPersistenceProperty.Value.Keep));
+    newEdge.setProperty(DataFlowProperty.of(DataFlowProperty.Value.Pull));
+    newEdge.setProperty(KeyExtractorProperty.of(edge.getPropertyValue(KeyExtractorProperty.class).get()));
+    newEdge.setProperty(AdditionalOutputTagProperty.of("DynOptData"));
+
+    // Dynamic optimization handles statistics on key-value data by default.
+    // We need to get coders for encoding/decoding the keys to send data to
+    // vertex with AggregateMetricTransform.
+    if (edge.getPropertyValue(KeyEncoderProperty.class).isPresent()
+      && edge.getPropertyValue(KeyDecoderProperty.class).isPresent()) {
+      final EncoderFactory keyEncoderFactory = edge.getPropertyValue(KeyEncoderProperty.class).get();
+      final DecoderFactory keyDecoderFactory = edge.getPropertyValue(KeyDecoderProperty.class).get();
+      newEdge.setProperty(EncoderProperty.of(PairEncoderFactory.of(keyEncoderFactory, LongEncoderFactory.of())));
+      newEdge.setProperty(DecoderProperty.of(PairDecoderFactory.of(keyDecoderFactory, LongDecoderFactory.of())));
+    } else {
+      // If not specified, follow encoder/decoder of the given shuffle edge.
+      newEdge.setProperty(EncoderProperty.of(edge.getPropertyValue(EncoderProperty.class).get()));
+      newEdge.setProperty(DecoderProperty.of(edge.getPropertyValue(DecoderProperty.class).get()));
+    }
+
+    return newEdge;
+  }
 }
diff --git a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
index e8e7a7cba..801cbede8 100644
--- a/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
+++ b/compiler/optimizer/src/main/java/edu/snu/nemo/compiler/optimizer/policy/DataSkewPolicy.java
@@ -31,10 +31,11 @@
 public final class DataSkewPolicy implements Policy {
   public static final PolicyBuilder BUILDER =
       new PolicyBuilder()
-          .registerRuntimePass(new DataSkewRuntimePass().setNumSkewedKeys(DataSkewRuntimePass.DEFAULT_NUM_SKEWED_KEYS),
-              new SkewCompositePass())
-          .registerCompileTimePass(new LoopOptimizationCompositePass())
-          .registerCompileTimePass(new DefaultCompositePass());
+        .registerRuntimePass(new DataSkewRuntimePass().setNumSkewedKeys(DataSkewRuntimePass.DEFAULT_NUM_SKEWED_KEYS),
+          new SkewCompositePass())
+        .registerCompileTimePass(new LoopOptimizationCompositePass())
+        .registerCompileTimePass(new DefaultCompositePass());
+
   private final Policy policy;
 
   /**
diff --git a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
index 79191ba8c..35ddbb2e4 100644
--- a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
+++ b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyBuilderTest.java
@@ -38,7 +38,7 @@ public void testTransientResourcePolicy() {
 
   @Test
   public void testDataSkewPolicy() {
-    assertEquals(22, DataSkewPolicy.BUILDER.getCompileTimePasses().size());
+    assertEquals(20, DataSkewPolicy.BUILDER.getCompileTimePasses().size());
     assertEquals(1, DataSkewPolicy.BUILDER.getRuntimePasses().size());
   }
 
diff --git a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
index 2b2c7b782..66aa36683 100644
--- a/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
+++ b/compiler/optimizer/src/test/java/edu/snu/nemo/compiler/optimizer/policy/PolicyImplTest.java
@@ -18,6 +18,9 @@
 
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.exception.CompileTimeOptimizationException;
+import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.vertex.IRVertex;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
 import edu.snu.nemo.common.test.EmptyComponents;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.CompileTimePass;
 import edu.snu.nemo.runtime.common.optimizer.pass.runtime.RuntimePass;
@@ -31,10 +34,12 @@
 
 public final class PolicyImplTest {
   private DAG dag;
+  private DAG dagForSkew;
 
   @Before
   public void setUp() {
     this.dag = EmptyComponents.buildEmptyDAG();
+    this.dagForSkew = EmptyComponents.buildEmptyDAGForSkew();
   }
 
   @Rule
@@ -55,7 +60,7 @@ public void testDisaggregationPolicy() throws Exception {
   @Test
   public void testDataSkewPolicy() throws Exception {
     // this should run without an exception.
-    DataSkewPolicy.BUILDER.build().runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
+    DataSkewPolicy.BUILDER.build().runCompileTimeOptimization(dagForSkew, DAG.EMPTY_DAG_DIRECTORY);
   }
 
   @Test
@@ -110,6 +115,6 @@ public void testDataSkewAndLargeShuffleCombination() throws Exception {
     // This should throw an exception.
     // DataSizeMetricCollection is not compatible with Push (All data have to be stored before the data collection).
     expectedException.expect(CompileTimeOptimizationException.class);
-    combinedPolicy.runCompileTimeOptimization(dag, DAG.EMPTY_DAG_DIRECTORY);
+    combinedPolicy.runCompileTimeOptimization(dagForSkew, DAG.EMPTY_DAG_DIRECTORY);
   }
 }
diff --git a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
index 1cd8996be..b04b84946 100644
--- a/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
+++ b/compiler/test/src/test/java/edu/snu/nemo/compiler/optimizer/pass/compiletime/composite/SkewCompositePassTest.java
@@ -18,12 +18,13 @@
 import edu.snu.nemo.client.JobLauncher;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.ir.edge.IREdge;
+import edu.snu.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.MetricCollectionProperty;
-import edu.snu.nemo.common.ir.edge.executionproperty.PartitionerProperty;
 import edu.snu.nemo.common.ir.vertex.IRVertex;
-import edu.snu.nemo.common.ir.vertex.MetricCollectionBarrierVertex;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionProperty;
+import edu.snu.nemo.common.ir.vertex.OperatorVertex;
+import edu.snu.nemo.common.ir.vertex.transform.MetricCollectTransform;
+import edu.snu.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import edu.snu.nemo.compiler.CompilerTestUtil;
 import edu.snu.nemo.common.ir.vertex.executionproperty.ResourceSkewedDataProperty;
 import edu.snu.nemo.compiler.optimizer.pass.compiletime.annotating.AnnotatingPass;
@@ -35,7 +36,6 @@
 
 import java.util.HashSet;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -48,7 +48,7 @@
 @PrepareForTest(JobLauncher.class)
 public class SkewCompositePassTest {
   private DAG<IRVertex, IREdge> mrDAG;
-  private static final long NUM_OF_PASSES_IN_DATA_SKEW_PASS = 5;
+  private static final long NUM_OF_PASSES_IN_DATA_SKEW_PASS = 3;
 
   @Before
   public void setUp() throws Exception {
@@ -74,37 +74,30 @@ public void testCompositePass() {
   }
 
   /**
-   * Test for {@link SkewCompositePass} with MR workload. It must insert a {@link MetricCollectionBarrierVertex}
-   * before each shuffle edge.
+   * Test for {@link SkewCompositePass} with MR workload.
+   * It should have inserted vertex with {@link MetricCollectTransform}
+   * and vertex with {@link AggregateMetricTransform}
+   * before each shuffle edge with no additional output tags.
    * @throws Exception exception on the way.
    */
   @Test
   public void testDataSkewPass() throws Exception {
     mrDAG = CompilerTestUtil.compileWordCountDAG();
     final Integer originalVerticesNum = mrDAG.getVertices().size();
-    final Long numOfShuffleGatherEdges = mrDAG.getVertices().stream().filter(irVertex ->
+    final Long numOfShuffleEdgesWithOutAdditionalOutputTag =
+      mrDAG.getVertices().stream().filter(irVertex ->
         mrDAG.getIncomingEdgesOf(irVertex).stream().anyMatch(irEdge ->
-            CommunicationPatternProperty.Value.Shuffle
-            .equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get())))
-        .count();
+          CommunicationPatternProperty.Value.Shuffle
+            .equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get())
+            && !irEdge.getPropertyValue(AdditionalOutputTagProperty.class).isPresent()))
+      .count();
     final DAG<IRVertex, IREdge> processedDAG = new SkewCompositePass().apply(mrDAG);
+    assertEquals(originalVerticesNum + numOfShuffleEdgesWithOutAdditionalOutputTag * 2,
+      processedDAG.getVertices().size());
 
-    assertEquals(originalVerticesNum + numOfShuffleGatherEdges, processedDAG.getVertices().size());
-    processedDAG.getVertices().stream().map(processedDAG::getIncomingEdgesOf)
-        .flatMap(List::stream)
-        .filter(irEdge -> CommunicationPatternProperty.Value.Shuffle
-            .equals(irEdge.getPropertyValue(CommunicationPatternProperty.class).get()))
-        .map(IREdge::getSrc)
-        .forEach(irVertex -> assertTrue(irVertex instanceof MetricCollectionBarrierVertex));
-
-    processedDAG.getVertices().forEach(v -> processedDAG.getOutgoingEdgesOf(v).stream()
-        .filter(e -> Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
-                  .equals(e.getPropertyValue(MetricCollectionProperty.class)))
-        .forEach(e -> assertEquals(PartitionerProperty.Value.DataSkewHashPartitioner,
-            e.getPropertyValue(PartitionerProperty.class).get())));
-
-    processedDAG.filterVertices(v -> v instanceof MetricCollectionBarrierVertex)
-        .forEach(metricV -> {
+    processedDAG.filterVertices(v -> v instanceof OperatorVertex
+      && ((OperatorVertex) v).getTransform() instanceof MetricCollectTransform)
+      .forEach(metricV -> {
           List<IRVertex> reducerV = processedDAG.getChildren(metricV.getId());
           reducerV.forEach(rV -> assertTrue(rV.getPropertyValue(ResourceSkewedDataProperty.class).get()));
         });
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
index 0d4cfa5e3..0de585ff4 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/RunTimeOptimizer.java
@@ -46,7 +46,7 @@ public static synchronized PhysicalPlan dynamicOptimization(
     // is a map of <hash value, partition size>.
     final PhysicalPlan physicalPlan =
       new DataSkewRuntimePass()
-        .apply(originalPlan, Pair.of(targetEdge, (Map<Integer, Long>) dynOptData));
+        .apply(originalPlan, Pair.of(targetEdge, (Map<Object, Long>) dynOptData));
     return physicalPlan;
   }
 }
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
index f277af1ac..a2b05f55a 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePass.java
@@ -39,11 +39,11 @@
  * this RuntimePass identifies a number of keys with big partition sizes(skewed key)
  * and evenly redistributes data via overwriting incoming edges of destination tasks.
  */
-public final class DataSkewRuntimePass extends RuntimePass<Pair<StageEdge, Map<Integer, Long>>> {
+public final class DataSkewRuntimePass extends RuntimePass<Pair<StageEdge, Map<Object, Long>>> {
   private static final Logger LOG = LoggerFactory.getLogger(DataSkewRuntimePass.class.getName());
   private final Set<Class<? extends RuntimeEventHandler>> eventHandlers;
   // Skewed keys denote for top n keys in terms of partition size.
-  public static final int DEFAULT_NUM_SKEWED_KEYS = 3;
+  public static final int DEFAULT_NUM_SKEWED_KEYS = 1;
   private int numSkewedKeys;
 
   /**
@@ -71,7 +71,7 @@ public DataSkewRuntimePass setNumSkewedKeys(final int numOfSkewedKeys) {
 
   @Override
   public PhysicalPlan apply(final PhysicalPlan originalPlan,
-                            final Pair<StageEdge, Map<Integer, Long>> metricData) {
+                            final Pair<StageEdge, Map<Object, Long>> metricData) {
     final StageEdge targetEdge = metricData.left();
     // Get number of evaluators of the next stage (number of blocks).
     final Integer dstParallelism = targetEdge.getDst().getPropertyValue(ParallelismProperty.class).
@@ -98,27 +98,29 @@ public PhysicalPlan apply(final PhysicalPlan originalPlan,
     return new PhysicalPlan(originalPlan.getPlanId(), stageDAG);
   }
 
-  public List<Integer> identifySkewedKeys(final Map<Integer, Long> keyValToPartitionSizeMap) {
+  public List<Long> identifySkewedKeys(final List<Long> partitionSizeList) {
     // Identify skewed keys.
-    List<Map.Entry<Integer, Long>> sortedMetricData = keyValToPartitionSizeMap.entrySet().stream()
-        .sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
+    List<Long> sortedMetricData = partitionSizeList.stream()
+        .sorted(Comparator.reverseOrder())
         .collect(Collectors.toList());
-    List<Integer> skewedKeys = new ArrayList<>();
+    List<Long> skewedSizes = new ArrayList<>();
     for (int i = 0; i < numSkewedKeys; i++) {
-      skewedKeys.add(sortedMetricData.get(i).getKey());
-      LOG.info("Skewed key: Key {} Size {}", sortedMetricData.get(i).getKey(), sortedMetricData.get(i).getValue());
+      skewedSizes.add(sortedMetricData.get(i));
+      LOG.info("Skewed size: {}", sortedMetricData.get(i));
     }
 
-    return skewedKeys;
+    return skewedSizes;
   }
 
-  private boolean containsSkewedKey(final List<Integer> skewedKeys,
-                                    final int startingKey, final int finishingKey) {
-    for (int k = startingKey; k < finishingKey; k++) {
-      if (skewedKeys.contains(k)) {
+  private boolean containsSkewedSize(final List<Long> partitionSizeList,
+                                     final List<Long> skewedKeys,
+                                     final int startingKey, final int finishingKey) {
+    for (int i = startingKey; i < finishingKey; i++) {
+      if (skewedKeys.contains(partitionSizeList.get(i))) {
         return true;
       }
     }
+
     return false;
   }
 
@@ -133,24 +135,25 @@ private boolean containsSkewedKey(final List<Integer> skewedKeys,
    * @return the list of key ranges calculated.
    */
   @VisibleForTesting
-  public List<KeyRange> calculateKeyRanges(final Map<Integer, Long> keyToPartitionSizeMap,
+  public List<KeyRange> calculateKeyRanges(final Map<Object, Long> keyToPartitionSizeMap,
                                            final Integer dstParallelism) {
-    // Get the last key.
-    final int lastKey = keyToPartitionSizeMap.keySet().stream()
-        .max(Integer::compareTo)
-        .get();
+    final List<Long> partitionSizeList = new ArrayList<>();
+    keyToPartitionSizeMap.forEach((k, v) -> partitionSizeList.add(v));
+
+    // Get the last index.
+    final int lastKey = partitionSizeList.size() - 1;
 
-    // Identify skewed keys, which is top numSkewedKeys number of keys.
-    List<Integer> skewedKeys = identifySkewedKeys(keyToPartitionSizeMap);
+    // Identify skewed sizes, which is top numSkewedKeys number of keys.
+    List<Long> skewedSizes = identifySkewedKeys(partitionSizeList);
 
     // Calculate the ideal size for each destination task.
-    final Long totalSize = keyToPartitionSizeMap.values().stream().mapToLong(n -> n).sum(); // get total size
+    final Long totalSize = partitionSizeList.stream().mapToLong(n -> n).sum(); // get total size
     final Long idealSizePerTask = totalSize / dstParallelism; // and derive the ideal size per task
 
     final List<KeyRange> keyRanges = new ArrayList<>(dstParallelism);
     int startingKey = 0;
     int finishingKey = 1;
-    Long currentAccumulatedSize = keyToPartitionSizeMap.getOrDefault(startingKey, 0L);
+    Long currentAccumulatedSize = partitionSizeList.get(startingKey);
     Long prevAccumulatedSize = 0L;
     for (int i = 1; i <= dstParallelism; i++) {
       if (i != dstParallelism) {
@@ -158,21 +161,21 @@ private boolean containsSkewedKey(final List<Integer> skewedKeys,
         final Long idealAccumulatedSize = idealSizePerTask * i;
         // By adding partition sizes, find the accumulated size nearest to the given ideal size.
         while (currentAccumulatedSize < idealAccumulatedSize) {
-          currentAccumulatedSize += keyToPartitionSizeMap.getOrDefault(finishingKey, 0L);
+          currentAccumulatedSize += partitionSizeList.get(finishingKey);
           finishingKey++;
         }
 
         final Long oneStepBack =
-            currentAccumulatedSize - keyToPartitionSizeMap.getOrDefault(finishingKey - 1, 0L);
+            currentAccumulatedSize - partitionSizeList.get(finishingKey - 1);
         final Long diffFromIdeal = currentAccumulatedSize - idealAccumulatedSize;
         final Long diffFromIdealOneStepBack = idealAccumulatedSize - oneStepBack;
         // Go one step back if we came too far.
         if (diffFromIdeal > diffFromIdealOneStepBack) {
           finishingKey--;
-          currentAccumulatedSize -= keyToPartitionSizeMap.getOrDefault(finishingKey, 0L);
+          currentAccumulatedSize -= partitionSizeList.get(finishingKey);
         }
 
-        boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, finishingKey);
+        boolean isSkewedKey = containsSkewedSize(partitionSizeList, skewedSizes, startingKey, finishingKey);
         keyRanges.add(i - 1, HashRange.of(startingKey, finishingKey, isSkewedKey));
         LOG.debug("KeyRange {}~{}, Size {}", startingKey, finishingKey - 1,
             currentAccumulatedSize - prevAccumulatedSize);
@@ -180,12 +183,12 @@ private boolean containsSkewedKey(final List<Integer> skewedKeys,
         prevAccumulatedSize = currentAccumulatedSize;
         startingKey = finishingKey;
       } else { // last one: we put the range of the rest.
-        boolean isSkewedKey = containsSkewedKey(skewedKeys, startingKey, lastKey + 1);
+        boolean isSkewedKey = containsSkewedSize(partitionSizeList, skewedSizes, startingKey, lastKey + 1);
         keyRanges.add(i - 1,
             HashRange.of(startingKey, lastKey + 1, isSkewedKey));
 
         while (finishingKey <= lastKey) {
-          currentAccumulatedSize += keyToPartitionSizeMap.getOrDefault(finishingKey, 0L);
+          currentAccumulatedSize += partitionSizeList.get(finishingKey);
           finishingKey++;
         }
         LOG.debug("KeyRange {}~{}, Size {}", startingKey, lastKey + 1,
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
index b1fa4fbea..6ecb7c486 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/PhysicalPlanGenerator.java
@@ -244,8 +244,7 @@ private void integrityCheck(final Stage stage) {
     stage.getIRDAG().getVertices().forEach(irVertex -> {
       // Check vertex type.
       if (!(irVertex instanceof  SourceVertex
-          || irVertex instanceof OperatorVertex
-          || irVertex instanceof MetricCollectionBarrierVertex)) {
+          || irVertex instanceof OperatorVertex)) {
         throw new UnsupportedOperationException(irVertex.toString());
       }
     });
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
index 2fd5c8b36..588e5d485 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/plan/StagePartitioner.java
@@ -75,7 +75,7 @@ public void addIgnoredPropertyKey(final Class<? extends VertexExecutionProperty>
       }
       // Get stage id of irVertex
       final int stageId = vertexToStageIdMap.get(irVertex);
-      // Step case: inductively assign stage ids based on mergability with irVertex
+      // Step case: inductively assign stage ids based on mergeability with irVertex
       for (final IREdge edge : irDAG.getOutgoingEdgesOf(irVertex)) {
         final IRVertex connectedIRVertex = edge.getDst();
         // Skip if it already has been assigned stageId
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index 3c6bb8e3b..0ad5a2962 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -120,12 +120,11 @@ message BlockStateChangedMsg {
 }
 
 message DataSizeMetricMsg {
-    // TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
     repeated PartitionSizeEntry partitionSize = 1;
 }
 
 message PartitionSizeEntry {
-    required int32 key = 1;
+    required string key = 1;
     required int64 size = 2;
 }
 
diff --git a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
index 04fca2bb1..d8948cff0 100644
--- a/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
+++ b/runtime/common/src/test/java/edu/snu/nemo/runtime/common/optimizer/pass/runtime/DataSkewRuntimePassTest.java
@@ -28,7 +28,7 @@
  * Test {@link DataSkewRuntimePass}.
  */
 public class DataSkewRuntimePassTest {
-  private final Map<Integer, Long> testMetricData = new HashMap<>();
+  private final Map<Object, Long> testMetricData = new HashMap<>();
 
   @Before
   public void setUp() {
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
index 692d01f31..3bb8cee4d 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/BlockManagerWorker.java
@@ -232,15 +232,11 @@ public Block createBlock(final String blockId,
    *
    * @param block                the block to write.
    * @param blockStore           the store to save the block.
-   * @param reportPartitionSizes whether report the size of partitions to master or not.
-   * @param partitionSizeMap     the map of partition keys and sizes to report.
    * @param expectedReadTotal    the expected number of read for this block.
    * @param persistence          how to handle the used block.
    */
   public void writeBlock(final Block block,
                          final DataStoreProperty.Value blockStore,
-                         final boolean reportPartitionSizes,
-                         final Map<Integer, Long> partitionSizeMap,
                          final int expectedReadTotal,
                          final DataPersistenceProperty.Value persistence) {
     final String blockId = block.getId();
@@ -278,28 +274,6 @@ public void writeBlock(final Block block,
             .setType(ControlMessage.MessageType.BlockStateChanged)
             .setBlockStateChangedMsg(blockStateChangedMsgBuilder.build())
             .build());
-
-    if (reportPartitionSizes) {
-      final List<ControlMessage.PartitionSizeEntry> partitionSizeEntries = new ArrayList<>();
-      partitionSizeMap.forEach((key, size) ->
-          partitionSizeEntries.add(
-              ControlMessage.PartitionSizeEntry.newBuilder()
-                  .setKey(key)
-                  .setSize(size)
-                  .build())
-      );
-
-      // TODO #4: Refactor metric aggregation for (general) run-rime optimization.
-      persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
-          .send(ControlMessage.Message.newBuilder()
-              .setId(RuntimeIdManager.generateMessageId())
-              .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
-              .setType(ControlMessage.MessageType.DataSizeMetric)
-              .setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
-                  .addAllPartitionSize(partitionSizeEntries)
-              )
-              .build());
-    }
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
index be42f6bae..1cee8271f 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/block/FileBlock.java
@@ -26,6 +26,8 @@
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
 import edu.snu.nemo.runtime.executor.data.metadata.PartitionMetadata;
 import edu.snu.nemo.runtime.executor.data.metadata.FileMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.*;
@@ -41,7 +43,7 @@
  */
 @NotThreadSafe
 public final class FileBlock<K extends Serializable> implements Block<K> {
-
+  private static final Logger LOG = LoggerFactory.getLogger(FileBlock.class.getName());
   private final String id;
   private final Map<K, SerializedPartition<K>> nonCommittedPartitionsMap;
   private final Serializer serializer;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
index 40150002e..16c125921 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/data/partition/SerializedPartition.java
@@ -18,6 +18,8 @@
 import edu.snu.nemo.common.DirectByteArrayOutputStream;
 import edu.snu.nemo.common.coder.EncoderFactory;
 import edu.snu.nemo.runtime.executor.data.streamchainer.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
@@ -31,6 +33,8 @@
  * @param <K> the key type of its partitions.
  */
 public final class SerializedPartition<K> implements Partition<byte[], K> {
+  private static final Logger LOG = LoggerFactory.getLogger(SerializedPartition.class.getName());
+
   private final K key;
   private volatile byte[] serializedData;
   private volatile int length;
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
index 0796b0117..260f12678 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputCollectorImpl.java
@@ -15,7 +15,10 @@
  */
 package edu.snu.nemo.runtime.executor.datatransfer;
 
+import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.ir.OutputCollector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.*;
 
@@ -25,22 +28,26 @@
  * @param <O> output type.
  */
 public final class OutputCollectorImpl<O> implements OutputCollector<O> {
+  private static final Logger LOG = LoggerFactory.getLogger(OutputCollectorImpl.class.getName());
   private final Set<String> mainTagOutputChildren;
   // Use ArrayList (not Queue) to allow 'null' values
   private final ArrayList<O> mainTagElements;
-  private final Map<String, ArrayList<Object>> additionalTagElementsMap;
+  // Key: Pair of tag and destination vertex id
+  // Value: data elements which will be input to the tagged destination vertex
+  private final Map<Pair<String, String>, ArrayList<Object>> additionalTaggedChildToElementsMap;
 
   /**
    * Constructor of a new OutputCollectorImpl with tagged outputs.
    * @param mainChildren   main children vertices
-   * @param taggedChildren additional children vertices
+   * @param tagToChildren additional children vertices
    */
   public OutputCollectorImpl(final Set<String> mainChildren,
-                             final List<String> taggedChildren) {
+                             final Map<String, String> tagToChildren) {
     this.mainTagOutputChildren = mainChildren;
     this.mainTagElements = new ArrayList<>(1);
-    this.additionalTagElementsMap = new HashMap<>();
-    taggedChildren.forEach(child -> this.additionalTagElementsMap.put(child, new ArrayList<>(1)));
+    this.additionalTaggedChildToElementsMap = new HashMap<>();
+    tagToChildren.forEach((tag, child) ->
+      this.additionalTaggedChildToElementsMap.put(Pair.of(tag, child), new ArrayList<>(1)));
   }
 
   @Override
@@ -55,10 +62,7 @@ public void emit(final O output) {
       emit((O) output);
     } else {
       // Note that String#hashCode() can be cached, thus accessing additional output queues can be fast.
-      final List<Object> dataElements = this.additionalTagElementsMap.get(dstVertexId);
-      if (dataElements == null) {
-        throw new IllegalArgumentException("Wrong destination vertex id passed!");
-      }
+      final List<Object> dataElements = getAdditionalTaggedDataFromDstVertexId(dstVertexId);
       dataElements.add(output);
     }
   }
@@ -72,11 +76,7 @@ public void emit(final O output) {
       // This dstVertexId is for the main tag
       return (Iterable<Object>) iterateMain();
     } else {
-      final List<Object> dataElements = this.additionalTagElementsMap.get(tag);
-      if (dataElements == null) {
-        throw new IllegalArgumentException("Wrong destination vertex id passed!");
-      }
-      return dataElements;
+      return getAdditionalTaggedDataFromTag(tag);
     }
   }
 
@@ -90,10 +90,7 @@ public void clearTag(final String tag) {
       clearMain();
     } else {
       // Note that String#hashCode() can be cached, thus accessing additional output queues can be fast.
-      final List<Object> dataElements = this.additionalTagElementsMap.get(tag);
-      if (dataElements == null) {
-        throw new IllegalArgumentException("Wrong destination vertex id passed!");
-      }
+      final List<Object> dataElements = getAdditionalTaggedDataFromTag(tag);
       dataElements.clear();
     }
   }
@@ -106,11 +103,31 @@ public void clearTag(final String tag) {
     if (this.mainTagOutputChildren.contains(dstVertexId)) {
       return (List<Object>) this.mainTagElements;
     } else {
-      final List<Object> dataElements = this.additionalTagElementsMap.get(dstVertexId);
-      if (dataElements == null) {
-        throw new IllegalArgumentException("Wrong destination vertex id passed!");
-      }
-      return dataElements;
+      return getAdditionalTaggedDataFromDstVertexId(dstVertexId);
     }
   }
+
+  private List<Object> getAdditionalTaggedDataFromDstVertexId(final String dstVertexId) {
+    final Pair<String, String> tagAndChild =
+      this.additionalTaggedChildToElementsMap.keySet().stream()
+        .filter(key -> key.right().equals(dstVertexId))
+        .findAny().orElseThrow(() -> new RuntimeException("Wrong destination vertex id passed!"));
+    final List<Object> dataElements = this.additionalTaggedChildToElementsMap.get(tagAndChild);
+    if (dataElements == null) {
+      throw new IllegalArgumentException("Wrong destination vertex id passed!");
+    }
+    return dataElements;
+  }
+
+  private List<Object> getAdditionalTaggedDataFromTag(final String tag) {
+    final Pair<String, String> tagAndChild =
+      this.additionalTaggedChildToElementsMap.keySet().stream()
+        .filter(key -> key.left().equals(tag))
+        .findAny().orElseThrow(() -> new RuntimeException("Wrong tag " + tag + " passed!"));
+    final List<Object> dataElements = this.additionalTaggedChildToElementsMap.get(tagAndChild);
+    if (dataElements == null) {
+      throw new IllegalArgumentException("Wrong tag " + tag + " passed!");
+    }
+    return dataElements;
+  }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
index 162f49151..b2f2b9abc 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -62,7 +62,6 @@
     this.blockStoreValue = runtimeEdge.getPropertyValue(DataStoreProperty.class).
         orElseThrow(() -> new RuntimeException("No data store property on the edge"));
 
-
     // Setup partitioner
     final int dstParallelism = dstIrVertex.getPropertyValue(ParallelismProperty.class).
         orElseThrow(() -> new RuntimeException("No parallelism property on the destination vertex"));
@@ -125,8 +124,6 @@ public void close() {
         runtimeEdge.getPropertyValue(DataPersistenceProperty.class).
             orElseThrow(() -> new RuntimeException("No data persistence property on the edge"));
 
-    final boolean isDataSizeMetricCollectionEdge = Optional.of(MetricCollectionProperty.Value.DataSkewRuntimePass)
-        .equals(runtimeEdge.getPropertyValue(MetricCollectionProperty.class));
     final Optional<Map<Integer, Long>> partitionSizeMap = blockToWrite.commit();
     // Return the total size of the committed block.
     if (partitionSizeMap.isPresent()) {
@@ -135,13 +132,10 @@ public void close() {
         blockSizeTotal += partitionSize;
       }
       this.writtenBytes = blockSizeTotal;
-      blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, isDataSizeMetricCollectionEdge,
-          partitionSizeMap.get(), getExpectedRead(), persistence);
     } else {
       this.writtenBytes = -1; // no written bytes info.
-      blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, isDataSizeMetricCollectionEdge,
-          Collections.emptyMap(), getExpectedRead(), persistence);
     }
+    blockManagerWorker.writeBlock(blockToWrite, blockStoreValue, getExpectedRead(), persistence);
   }
 
   /**
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 4b156d9be..ca5476a68 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -22,6 +22,7 @@
 import edu.snu.nemo.common.ir.edge.executionproperty.AdditionalOutputTagProperty;
 import edu.snu.nemo.common.ir.edge.executionproperty.BroadcastVariableIdProperty;
 import edu.snu.nemo.common.ir.vertex.*;
+import edu.snu.nemo.common.ir.vertex.transform.AggregateMetricTransform;
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.runtime.common.RuntimeIdManager;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
@@ -56,6 +57,7 @@
 public final class TaskExecutor {
   private static final Logger LOG = LoggerFactory.getLogger(TaskExecutor.class.getName());
   private static final int NONE_FINISHED = -1;
+  private static final String NULL_KEY = "NULL";
 
   // Essential information
   private boolean isExecuted;
@@ -156,7 +158,15 @@ public TaskExecutor(final Task task,
       // Prepare data WRITE
       // Child-task writes
       final Map<String, String> additionalOutputMap =
-        getAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), irVertexDag);
+          getAdditionalOutputMap(irVertex, task.getTaskOutgoingEdges(), irVertexDag);
+
+      final List<Boolean> isToAdditionalTagOutputs = children.stream()
+          .map(harness -> harness.getIRVertex().getId())
+          .map(additionalOutputMap::containsValue)
+          .collect(Collectors.toList());
+
+      // Handle writes
+      // Main output children task writes
       final List<OutputWriter> mainChildrenTaskWriters = getMainChildrenTaskWriters(
         irVertex, task.getTaskOutgoingEdges(), dataTransferFactory, additionalOutputMap);
       final Map<String, OutputWriter> additionalChildrenTaskWriters = getAdditionalChildrenTaskWriters(
@@ -164,12 +174,8 @@ public TaskExecutor(final Task task,
       // Intra-task writes
       final List<String> additionalOutputVertices = new ArrayList<>(additionalOutputMap.values());
       final Set<String> mainChildren =
-        getMainOutputVertices(irVertex, irVertexDag, task.getTaskOutgoingEdges(), additionalOutputVertices);
-      final OutputCollectorImpl oci = new OutputCollectorImpl(mainChildren, additionalOutputVertices);
-      final List<Boolean> isToAdditionalTagOutputs = children.stream()
-        .map(harness -> harness.getIRVertex().getId())
-        .map(additionalOutputMap::containsValue)
-        .collect(Collectors.toList());
+          getMainOutputVertices(irVertex, irVertexDag, task.getTaskOutgoingEdges(), additionalOutputVertices);
+      final OutputCollectorImpl oci = new OutputCollectorImpl(mainChildren, additionalOutputMap);
 
       // Create VERTEX HARNESS
       final VertexHarness vertexHarness = new VertexHarness(
@@ -231,27 +237,26 @@ public TaskExecutor(final Task task,
   private void processElementRecursively(final VertexHarness vertexHarness, final Object dataElement) {
     final IRVertex irVertex = vertexHarness.getIRVertex();
     final OutputCollectorImpl outputCollector = vertexHarness.getOutputCollector();
+
     if (irVertex instanceof SourceVertex) {
       outputCollector.emit(dataElement);
     } else if (irVertex instanceof OperatorVertex) {
       final Transform transform = ((OperatorVertex) irVertex).getTransform();
       transform.onData(dataElement);
-    } else if (irVertex instanceof MetricCollectionBarrierVertex) {
-      outputCollector.emit(dataElement);
-      setIRVertexPutOnHold((MetricCollectionBarrierVertex) irVertex);
     } else {
       throw new UnsupportedOperationException("This type of IRVertex is not supported");
     }
 
     // Given a single input element, a vertex can produce many output elements.
-    // Here, we recursively process all of the main oltput elements.
-    outputCollector.iterateMain().forEach(element -> handleMainOutputElement(vertexHarness, element)); // Recursion
+    // Here, we recursively process all of the main output elements.
+    outputCollector.iterateMain().forEach(element ->
+      handleMainOutputElement(vertexHarness, element)); // Recursion
     outputCollector.clearMain();
 
     // Recursively process all of the additional output elements.
-    vertexHarness.getContext().getTagToAdditionalChildren().values().forEach(tag -> {
-      outputCollector.iterateTag(tag).forEach(
-        element -> handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
+    vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
+      outputCollector.iterateTag(tag).forEach(element ->
+        handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
       outputCollector.clearTag(tag);
     });
   }
@@ -310,21 +315,67 @@ private void doExecute() {
     }
   }
 
+  /**
+   * Send aggregated statistics for dynamic optimization to master.
+   * @param dynOptData the statistics to send.
+   */
+  public void sendDynOptData(final Object dynOptData) {
+    Map<Object, Long> aggregatedDynOptData = (Map<Object, Long>) dynOptData;
+    final List<ControlMessage.PartitionSizeEntry> partitionSizeEntries = new ArrayList<>();
+    aggregatedDynOptData.forEach((key, size) ->
+      partitionSizeEntries.add(
+        ControlMessage.PartitionSizeEntry.newBuilder()
+          .setKey(key == null ? NULL_KEY : String.valueOf(key))
+          .setSize(size)
+          .build())
+    );
+
+    persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+      .send(ControlMessage.Message.newBuilder()
+        .setId(RuntimeIdManager.generateMessageId())
+        .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
+        .setType(ControlMessage.MessageType.DataSizeMetric)
+        .setDataSizeMetricMsg(ControlMessage.DataSizeMetricMsg.newBuilder()
+          .addAllPartitionSize(partitionSizeEntries)
+        )
+        .build());
+  }
+
   private void finalizeVertex(final VertexHarness vertexHarness) {
     closeTransform(vertexHarness);
-    final OutputCollectorImpl outputCollector = vertexHarness.getOutputCollector();
 
-    // handle main outputs
-    outputCollector.iterateMain().forEach(element -> handleMainOutputElement(vertexHarness, element)); // Recursion
-    outputCollector.clearMain();
-
-    // handle additional tagged outputs
-    vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
-      outputCollector.iterateTag(tag).forEach(
-        element -> handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
-      outputCollector.clearTag(tag);
-    });
-    finalizeOutputWriters(vertexHarness);
+    final OutputCollectorImpl outputCollector = vertexHarness.getOutputCollector();
+    final IRVertex v = vertexHarness.getIRVertex();
+    if (v instanceof OperatorVertex
+      && ((OperatorVertex) v).getTransform() instanceof AggregateMetricTransform) {
+      // send aggregated dynamic optimization data to master
+      final Object aggregatedDynOptData = outputCollector.iterateMain().iterator().next();
+      sendDynOptData(aggregatedDynOptData);
+      // set the id of this vertex to mark the corresponding stage as put on hold
+      setIRVertexPutOnHold(v);
+    } else {
+      // handle main outputs
+      outputCollector.iterateMain().forEach(element -> {
+        handleMainOutputElement(vertexHarness, element);
+      }); // Recursion
+      outputCollector.clearMain();
+
+      // handle intra-task additional tagged outputs
+      vertexHarness.getAdditionalTagOutputChildren().keySet().forEach(tag -> {
+        outputCollector.iterateTag(tag).forEach(
+          element -> handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
+        outputCollector.clearTag(tag);
+      });
+
+      // handle inter-task additional tagged outputs
+      vertexHarness.getTagToAdditionalChildrenId().keySet().forEach(tag -> {
+        outputCollector.iterateTag(tag).forEach(
+          element -> handleAdditionalOutputElement(vertexHarness, element, tag)); // Recursion
+        outputCollector.clearTag(tag);
+      });
+
+      finalizeOutputWriters(vertexHarness);
+    }
   }
 
   private void handleMainOutputElement(final VertexHarness harness, final Object element) {
@@ -357,7 +408,6 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
       for (int i = 0; i < availableFetchers.size(); i++) {
         final DataFetcher dataFetcher = availableFetchers.get(i);
         final Object element;
-
         try {
           element = dataFetcher.fetchDataElement();
         } catch (NoSuchElementException e) {
@@ -492,7 +542,7 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
    * @param outEdgesToChildrenTasks outgoing edges to child tasks
    * @param dataTransferFactory     dataTransferFactory
    * @param taggedOutputs           tag to vertex id map
-   * @return additional children vertex id to OutputWriters map.
+   * @return additional tag to OutputWriters map.
    */
   private Map<String, OutputWriter> getAdditionalChildrenTaskWriters(final IRVertex irVertex,
                                                                      final List<StageEdge> outEdgesToChildrenTasks,
@@ -501,12 +551,17 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
     final Map<String, OutputWriter> additionalChildrenTaskWriters = new HashMap<>();
 
     outEdgesToChildrenTasks
-      .stream()
-      .filter(outEdge -> outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
-      .filter(outEdge -> taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
-      .forEach(outEdgeForThisVertex ->
-        additionalChildrenTaskWriters.put(outEdgeForThisVertex.getDstIRVertex().getId(),
-          dataTransferFactory.createWriter(taskId, outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex)));
+        .stream()
+        .filter(outEdge -> outEdge.getSrcIRVertex().getId().equals(irVertex.getId()))
+        .filter(outEdge -> taggedOutputs.containsValue(outEdge.getDstIRVertex().getId()))
+        .forEach(outEdgeForThisVertex -> {
+          final String tag = taggedOutputs.entrySet().stream()
+            .filter(e -> e.getValue().equals(outEdgeForThisVertex.getDstIRVertex().getId()))
+            .findAny().orElseThrow(() -> new RuntimeException("Unexpected error while finding tag"))
+            .getKey();
+          additionalChildrenTaskWriters.put(tag,
+              dataTransferFactory.createWriter(taskId, outEdgeForThisVertex.getDstIRVertex(), outEdgeForThisVertex));
+        });
 
     return additionalChildrenTaskWriters;
   }
@@ -530,18 +585,21 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
 
   private void prepareTransform(final VertexHarness vertexHarness) {
     final IRVertex irVertex = vertexHarness.getIRVertex();
+    final Transform transform;
     if (irVertex instanceof OperatorVertex) {
-      final Transform transform = ((OperatorVertex) irVertex).getTransform();
+      transform = ((OperatorVertex) irVertex).getTransform();
       transform.prepare(vertexHarness.getContext(), vertexHarness.getOutputCollector());
     }
   }
 
   private void closeTransform(final VertexHarness vertexHarness) {
     final IRVertex irVertex = vertexHarness.getIRVertex();
+    final Transform transform;
     if (irVertex instanceof OperatorVertex) {
-      Transform transform = ((OperatorVertex) irVertex).getTransform();
+      transform = ((OperatorVertex) irVertex).getTransform();
       transform.close();
     }
+
     vertexHarness.getContext().getSerializedData().ifPresent(data ->
       persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
         ControlMessage.Message.newBuilder()
@@ -554,7 +612,7 @@ private void closeTransform(final VertexHarness vertexHarness) {
 
   ////////////////////////////////////////////// Misc
 
-  private void setIRVertexPutOnHold(final MetricCollectionBarrierVertex irVertex) {
+  private void setIRVertexPutOnHold(final IRVertex irVertex) {
     idOfVertexPutOnHold = irVertex.getId();
   }
 
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
index 2ad88683b..f8ea8e054 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/VertexHarness.java
@@ -19,6 +19,8 @@
 import edu.snu.nemo.common.ir.vertex.transform.Transform;
 import edu.snu.nemo.runtime.executor.datatransfer.OutputCollectorImpl;
 import edu.snu.nemo.runtime.executor.datatransfer.OutputWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -29,6 +31,8 @@
  * Captures the relationship between a non-source IRVertex's outputCollector, and mainTagChildren vertices.
  */
 final class VertexHarness {
+  private static final Logger LOG = LoggerFactory.getLogger(VertexHarness.class.getName());
+
   // IRVertex and transform-specific information
   private final IRVertex irVertex;
   private final OutputCollectorImpl outputCollector;
@@ -37,6 +41,7 @@
 
   // These lists can be empty
   private final Map<String, VertexHarness> additionalTagOutputChildren;
+  private final Map<String, String> tagToAdditionalChildrenId;
   private final List<OutputWriter> writersToMainChildrenTasks;
   private final Map<String, OutputWriter> writersToAdditionalChildrenTasks;
 
@@ -54,14 +59,18 @@
     }
     final Map<String, String> taggedOutputMap = context.getTagToAdditionalChildren();
     final Map<String, VertexHarness> tagged = new HashMap<>();
+
+    // Classify input type for intra-task children
     for (int i = 0; i < children.size(); i++) {
       final VertexHarness child = children.get(i);
       if (isAdditionalTagOutputs.get(i)) {
         taggedOutputMap.entrySet().stream()
-            .filter(kv -> child.getIRVertex().getId().equals(kv.getValue()))
-            .forEach(kv -> tagged.put(kv.getValue(), child));
+          .filter(kv -> child.getIRVertex().getId().equals(kv.getValue()))
+          .forEach(kv -> tagged.put(kv.getKey(), child));
       }
     }
+
+    this.tagToAdditionalChildrenId = context.getTagToAdditionalChildren();
     this.additionalTagOutputChildren = tagged;
     final List<VertexHarness> mainTagChildrenTmp = new ArrayList<>(children);
     mainTagChildrenTmp.removeAll(additionalTagOutputChildren.values());
@@ -99,6 +108,13 @@ OutputCollectorImpl getOutputCollector() {
     return additionalTagOutputChildren;
   }
 
+  /**
+   * @return map of tag to additional children id.
+   */
+  public Map<String, String> getTagToAdditionalChildrenId() {
+    return tagToAdditionalChildrenId;
+  }
+
   /**
    * @return OutputWriters for main outputs of this irVertex. (empty if none exists)
    */
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
index 0d0a69b86..01af83ffc 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/datatransfer/DataTransferTest.java
@@ -15,9 +15,7 @@
  */
 package edu.snu.nemo.runtime.executor.datatransfer;
 
-import edu.snu.nemo.common.DataSkewMetricFactory;
-import edu.snu.nemo.common.HashRange;
-import edu.snu.nemo.common.KeyRange;
+import edu.snu.nemo.common.*;
 import edu.snu.nemo.common.coder.*;
 import edu.snu.nemo.common.eventhandler.PubSubEventHandlerWrapper;
 import edu.snu.nemo.common.ir.edge.IREdge;
@@ -29,7 +27,6 @@
 import edu.snu.nemo.common.ir.vertex.executionproperty.ScheduleGroupProperty;
 import edu.snu.nemo.common.test.EmptyComponents;
 import edu.snu.nemo.conf.JobConf;
-import edu.snu.nemo.common.Pair;
 import edu.snu.nemo.common.dag.DAG;
 import edu.snu.nemo.common.dag.DAGBuilder;
 import edu.snu.nemo.common.ir.executionproperty.ExecutionPropertyMap;
@@ -48,6 +45,7 @@
 import edu.snu.nemo.runtime.executor.data.SerializerManager;
 import edu.snu.nemo.runtime.master.*;
 import edu.snu.nemo.runtime.master.eventhandler.UpdatePhysicalPlanEventHandler;
+import org.apache.beam.sdk.values.KV;
 import org.apache.commons.io.FileUtils;
 import org.apache.reef.driver.evaluator.EvaluatorRequestor;
 import org.apache.reef.io.network.naming.NameResolverConfiguration;
@@ -68,6 +66,7 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.*;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -299,7 +298,7 @@ private void writeAndRead(final BlockManagerWorker sender,
 
     // Edge setup
     final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
-    dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
+    dummyIREdge.setProperty(KeyExtractorProperty.of(element -> element));
     dummyIREdge.setProperty(CommunicationPatternProperty.of(commPattern));
     dummyIREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
     dummyIREdge.setProperty(DataStoreProperty.of(store));
@@ -387,7 +386,7 @@ private void writeAndReadWithDuplicateData(final BlockManagerWorker sender,
     final IREdge dummyIREdge = new IREdge(commPattern, srcVertex, dstVertex);
     dummyIREdge.setProperty(EncoderProperty.of(ENCODER_FACTORY));
     dummyIREdge.setProperty(DecoderProperty.of(DECODER_FACTORY));
-    dummyIREdge.setProperty(KeyExtractorProperty.of((element -> element)));
+    dummyIREdge.setProperty(KeyExtractorProperty.of(element -> element));
     dummyIREdge.setProperty(CommunicationPatternProperty.of(commPattern));
     dummyIREdge.setProperty(PartitionerProperty.of(PartitionerProperty.Value.HashPartitioner));
     dummyIREdge.setProperty(DuplicateEdgeGroupProperty.of(new DuplicateEdgeGroupPropertyValue("dummy")));
@@ -532,5 +531,5 @@ private Stage setupStages(final String stageId) {
     stageExecutionProperty.put(ScheduleGroupProperty.of(0));
     return new Stage(stageId, emptyDag, stageExecutionProperty, Collections.emptyList());
   }
-
 }
+
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java
index 47671ed54..439c4ffcc 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/DataSkewDynOptDataHandler.java
@@ -25,7 +25,7 @@
  * Handler for aggregating data used in data skew dynamic optimization.
  */
 public class DataSkewDynOptDataHandler implements DynOptDataHandler {
-  private final Map<Integer, Long> aggregatedDynOptData;
+  private final Map<Object, Long> aggregatedDynOptData;
 
   public DataSkewDynOptDataHandler() {
     this.aggregatedDynOptData = new HashMap<>();
@@ -40,12 +40,12 @@ public final void updateDynOptData(final Object dynOptData) {
     List<ControlMessage.PartitionSizeEntry> partitionSizeInfo
         = (List<ControlMessage.PartitionSizeEntry>) dynOptData;
     partitionSizeInfo.forEach(partitionSizeEntry -> {
-      final int hashIndex = partitionSizeEntry.getKey();
+      final Object key = partitionSizeEntry.getKey();
       final long partitionSize = partitionSizeEntry.getSize();
-      if (aggregatedDynOptData.containsKey(hashIndex)) {
-        aggregatedDynOptData.compute(hashIndex, (originalKey, originalValue) -> originalValue + partitionSize);
+      if (aggregatedDynOptData.containsKey(key)) {
+        aggregatedDynOptData.compute(key, (originalKey, originalValue) -> originalValue + partitionSize);
       } else {
-        aggregatedDynOptData.put(hashIndex, partitionSize);
+        aggregatedDynOptData.put(key, partitionSize);
       }
     });
   }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 312973293..de6df9241 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -378,7 +378,6 @@ private void handleControlMessage(final ControlMessage.Message message) {
         LOG.error(failedExecutorId + " failed, Stack Trace: ", exception);
         throw new RuntimeException(exception);
       case DataSizeMetric:
-        // TODO #96: Modularize DataSkewPolicy to use MetricVertex and BarrierVertex.
         ((BatchScheduler) scheduler).updateDynOptData(message.getDataSizeMetricMsg().getPartitionSizeList());
         break;
       case MetricMessageReceived:
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
index 5ccc2fc96..21e196e7b 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/scheduler/BatchScheduler.java
@@ -419,7 +419,14 @@ private void onTaskExecutionComplete(final String executorId,
   }
 
   /**
-   * @param taskId the metric collected task ID.
+   * Get the target edge of dynamic optimization.
+   * The edge is annotated with {@link MetricCollectionProperty}, which is an outgoing edge of
+   * a parent of the stage put on hold.
+   *
+   * See {@link edu.snu.nemo.compiler.optimizer.pass.compiletime.reshaping.SkewReshapingPass}
+   * for setting the target edge of dynamic optimization.
+   *
+   * @param taskId the task ID that sent stage-level aggregated metric for dynamic optimization.
    * @return the edge to optimize.
    */
   private StageEdge getEdgeToOptimize(final String taskId) {
@@ -429,8 +436,18 @@ private StageEdge getEdgeToOptimize(final String taskId) {
       .findFirst()
       .orElseThrow(() -> new RuntimeException());
 
+    // Stage put on hold, i.e. stage with vertex containing AggregateMetricTransform
+    // should have a parent stage whose outgoing edges contain the target edge of dynamic optimization.
+    final List<Stage> parentStages = planStateManager.getPhysicalPlan().getStageDAG()
+      .getParents(stagePutOnHold.getId());
+
+    if (parentStages.size() > 1) {
+      throw new RuntimeException("Error in setting target edge of dynamic optimization!");
+    }
+
     // Get outgoing edges of that stage with MetricCollectionProperty
-    List<StageEdge> stageEdges = planStateManager.getPhysicalPlan().getStageDAG().getOutgoingEdgesOf(stagePutOnHold);
+    List<StageEdge> stageEdges = planStateManager.getPhysicalPlan().getStageDAG()
+      .getOutgoingEdgesOf(parentStages.get(0));
     for (StageEdge edge : stageEdges) {
       if (edge.getExecutionProperties().containsKey(MetricCollectionProperty.class)) {
         return edge;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services