You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@nemo.apache.org by GitBox <gi...@apache.org> on 2018/07/03 08:29:08 UTC

[GitHub] jeongyooneo closed pull request #61: [NEMO-20] RESTful APIs to Access Job State and Metric

jeongyooneo closed pull request #61: [NEMO-20] RESTful APIs to Access Job State and Metric
URL: https://github.com/apache/incubator-nemo/pull/61
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java b/common/src/main/java/edu/snu/nemo/common/exception/UnsupportedMetricException.java
similarity index 62%
rename from runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java
rename to common/src/main/java/edu/snu/nemo/common/exception/UnsupportedMetricException.java
index d81db8c25..f7a32964e 100644
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/parameter/MetricFlushPeriod.java
+++ b/common/src/main/java/edu/snu/nemo/common/exception/UnsupportedMetricException.java
@@ -13,14 +13,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package edu.snu.nemo.runtime.common.metric.parameter;
-
-import org.apache.reef.tang.annotations.Name;
-import org.apache.reef.tang.annotations.NamedParameter;
+package edu.snu.nemo.common.exception;
 
 /**
- * Metric flushing period.
+ * UnsupportedMetricException.
+ * This exception will be thrown when MetricStore receives unsupported metric.
  */
-@NamedParameter(doc = "Metric flushing period (ms)", short_name = "mf_period", default_value = "5000")
-public final class MetricFlushPeriod implements Name<Long> {
+public final class UnsupportedMetricException extends RuntimeException {
+  /**
+   * UnsupportedMetricException.
+   * @param cause cause
+   */
+  public UnsupportedMetricException(final Throwable cause) {
+    super(cause);
+  }
 }
diff --git a/pom.xml b/pom.xml
index 95939f09c..42ad7abe7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,8 @@ limitations under the License.
         <jackson.version>2.8.8</jackson.version>
         <netlib.version>1.1.2</netlib.version>
         <netty.version>4.1.16.Final</netty.version>
+        <jetty-server.version>9.4.10.v20180503</jetty-server.version>
+        <jetty-servlet.version>9.4.10.v20180503</jetty-servlet.version>
         <slf4j.version>1.7.20</slf4j.version>
         <!-- Tests -->
         <mockito.version>2.13.0</mockito.version>
diff --git a/runtime/common/pom.xml b/runtime/common/pom.xml
index cedf02d02..97fca5ba1 100644
--- a/runtime/common/pom.xml
+++ b/runtime/common/pom.xml
@@ -68,5 +68,15 @@ limitations under the License.
             <version>${grpc.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${jackson.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/DataTransferEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/DataTransferEvent.java
new file mode 100644
index 000000000..3f282445c
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/DataTransferEvent.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+/**
+ * Event for data transfer, such as data read or write.
+ */
+public class DataTransferEvent extends Event {
+  private TransferType transferType;
+
+  public DataTransferEvent(final long timestamp, final TransferType transferType) {
+    super(timestamp);
+    this.transferType = transferType;
+  }
+
+  /**
+   * Get transfer type.
+   * @return TransferType.
+   */
+  public final TransferType getTransferType() {
+    return transferType;
+  }
+
+  /**
+   * Set transfer type.
+   * @param transferType TransferType to set.
+   */
+  public final void setTransferType(final TransferType transferType) {
+    this.transferType = transferType;
+  }
+
+  /**
+   * Enum of transfer types.
+   */
+  public enum TransferType {
+    READ_START,
+    READ_END,
+    WRITE_START,
+    WRITE_END
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Event.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Event.java
new file mode 100644
index 000000000..0746bd2b5
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Event.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+import java.io.Serializable;
+
+/**
+ * Class for all generic event that contains timestamp at the moment.
+ */
+public class Event implements Serializable {
+  private long timestamp;
+
+  /**
+   * Constructor.
+   * @param timestamp timestamp in millisecond.
+   */
+  public Event(final long timestamp) {
+    this.timestamp = timestamp;
+  }
+
+  /**
+   * Get timestamp.
+   * @return timestamp.
+   */
+  public final long getTimestamp() {
+    return timestamp;
+  };
+
+  /**
+   * Set timestamp.
+   * @param timestamp timestamp in millisecond.
+   */
+  public final void setTimestamp(final long timestamp) {
+    this.timestamp = timestamp;
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
new file mode 100644
index 000000000..fea3a20a0
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/JobMetric.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import edu.snu.nemo.common.dag.DAG;
+import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
+import edu.snu.nemo.runtime.common.state.JobState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Metric class for Job (or {@link PhysicalPlan}).
+ */
+public final class JobMetric implements StateMetric<JobState.State> {
+  private String id;
+  private List<StateTransitionEvent<JobState.State>> stateTransitionEvents = new ArrayList<>();
+  private JsonNode stageDagJson;
+
+  public JobMetric(final PhysicalPlan physicalPlan) {
+    this.id = physicalPlan.getId();
+  }
+
+  public JobMetric(final String id) {
+    this.id = id;
+  }
+
+  @JsonProperty("dag")
+  public JsonNode getStageDAG() {
+    return stageDagJson;
+  }
+
+  public void setStageDAG(final DAG dag) {
+    final String dagJson = dag.toString();
+    final ObjectMapper objectMapper = new ObjectMapper();
+    try {
+      this.stageDagJson = objectMapper.readTree(dagJson);
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String getId() {
+    return id;
+  }
+
+  @Override
+  public List<StateTransitionEvent<JobState.State>> getStateTransitionEvents() {
+    return stateTransitionEvents;
+  }
+
+  @Override
+  public void addEvent(final JobState.State prevState, final JobState.State newState) {
+    stateTransitionEvents.add(new StateTransitionEvent<>(System.currentTimeMillis(), prevState, newState));
+  }
+
+  @Override
+  public boolean processMetricMessage(final String metricField, final byte[] metricValue) {
+    // do nothing
+    return false;
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Metric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Metric.java
new file mode 100644
index 000000000..1d44c0867
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/Metric.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+/**
+ * Interface for all metrics.
+ */
+public interface Metric {
+  /**
+   * Get its unique id.
+   * @return an unique id
+   */
+  String getId();
+
+  /**
+   * Process metric message from evaluators.
+   * @param metricField field name of the metric.
+   * @param metricValue byte array of serialized data value.
+   * @return true if the metric was changed or false if not.
+   */
+  boolean processMetricMessage(final String metricField, final byte[] metricValue);
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricData.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricData.java
deleted file mode 100644
index ce61cbf61..000000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricData.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.common.metric;
-
-import edu.snu.nemo.common.exception.JsonParseException;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import java.util.Map;
-
-/**
- * MetricData that holds executor side metrics.
- */
-public class MetricData {
-  /**
-   * Computation units are: Job, State, Task.
-   */
-  private final String computationUnitId;
-  private final ObjectMapper objectMapper;
-  private final Map<String, Object> metrics;
-
-  /**
-   * Constructor.
-   * @param computationUnitId the id of the computation unit.
-   * @param metrics the metric data.
-   */
-  MetricData(final String computationUnitId,
-             final Map<String, Object> metrics) {
-    this.computationUnitId = computationUnitId;
-    this.objectMapper = new ObjectMapper();
-    this.metrics = metrics;
-  }
-
-  /**
-   * @return the computation unit id.
-   */
-  public final String getComputationUnitId() {
-    return computationUnitId;
-  }
-
-  /**
-   * @return the metric data.
-   */
-  public final Map<String, Object> getMetrics() {
-    return metrics;
-  }
-
-  /**
-   * @return a JSON expression of the metric data.
-   */
-  public final String toJson() {
-    try {
-      final String jsonStr = objectMapper.writeValueAsString(metrics);
-      return jsonStr;
-    } catch (final Exception e) {
-      throw new JsonParseException(e);
-    }
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricDataBuilder.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricDataBuilder.java
deleted file mode 100644
index 2031edec1..000000000
--- a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/MetricDataBuilder.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package edu.snu.nemo.runtime.common.metric;
-
-import java.util.Map;
-
-/**
- * MetricData Builder.
- */
-public final class MetricDataBuilder {
-  private final String computationUnitId;
-  private long startTime;
-  private long endTime;
-  private Map<String, Object> metrics;
-
-  /**
-   * Constructor.
-   * @param computationUnitId id of the computation unit.
-   */
-  public MetricDataBuilder(final String computationUnitId) {
-    this.computationUnitId = computationUnitId;
-    startTime = 0;
-    endTime = 0;
-    metrics = null;
-  }
-
-  /**
-   * @return the id of the computation unit.
-   */
-  public String getComputationUnitId() {
-    return computationUnitId;
-  }
-
-  /**
-   * @return the metric data.
-   */
-  public Map<String, Object> getMetrics() {
-    return metrics;
-  }
-
-  /**
-   * @return the time at which metric collection starts.
-   */
-  public long getStartTime() {
-    return startTime;
-  }
-
-  /**
-   * @return the time at which metric collection ends.
-   */
-  public long getEndTime() {
-    return endTime;
-  }
-
-  /**
-   * Begin the measurement of metric data.
-   * @param metricMap map on which to collect metrics.
-   */
-  public void beginMeasurement(final Map<String, Object> metricMap) {
-    startTime = System.currentTimeMillis();
-    metricMap.put("StartTime", startTime);
-    this.metrics = metricMap;
-  }
-
-  /**
-   * End the measurement of metric data.
-   * @param metricMap map on which to collect metrics.
-   */
-  public void endMeasurement(final Map<String, Object> metricMap) {
-    endTime = System.currentTimeMillis();
-    metricMap.put("EndTime", endTime);
-    metricMap.put("ElapsedTime(ms)", endTime - startTime);
-    this.metrics.putAll(metricMap);
-  }
-
-  /**
-   * Builds immutable MetricData.
-   * @return the MetricData constructed by the builder.
-   */
-  public MetricData build() {
-    return new MetricData(getComputationUnitId(), getMetrics());
-  }
-}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StageMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StageMetric.java
new file mode 100644
index 000000000..ff174a961
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StageMetric.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+import edu.snu.nemo.runtime.common.plan.Stage;
+import edu.snu.nemo.runtime.common.state.StageState;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Metric class for {@link Stage}.
+ */
+public class StageMetric implements StateMetric<StageState.State> {
+  private String id;
+  private List<StateTransitionEvent<StageState.State>> stateTransitionEvents = new ArrayList<>();
+
+  public StageMetric(final Stage stage) {
+    this.id = stage.getId();
+  }
+
+  public StageMetric(final String id) {
+    this.id = id;
+  }
+
+  @Override
+  public final String getId() {
+    return id;
+  }
+
+  @Override
+  public final List<StateTransitionEvent<StageState.State>> getStateTransitionEvents() {
+    return stateTransitionEvents;
+  }
+
+  @Override
+  public final void addEvent(final StageState.State prevState, final StageState.State newState) {
+    stateTransitionEvents.add(new StateTransitionEvent<>(System.currentTimeMillis(), prevState, newState));
+  }
+
+  @Override
+  public final boolean processMetricMessage(final String metricField, final byte[] metricValue) {
+    // do nothing
+    return false;
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java
new file mode 100644
index 000000000..426e8e048
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateMetric.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+import java.util.List;
+
+/**
+ * Interface for metric which contians its state.
+ * @param <T> class of state of the metric.
+ */
+public interface StateMetric<T> extends Metric {
+  /**
+   * Get its list of {@link StateTransitionEvent}.
+   * @return list of events.
+   */
+  List<StateTransitionEvent<T>> getStateTransitionEvents();
+
+  /**
+   * Add a {@link StateTransitionEvent} to the metric.
+   * @param prevState previous state.
+   * @param newState new state.
+   */
+  void addEvent(final T prevState, final T newState);
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java
new file mode 100644
index 000000000..43ce124ee
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/StateTransitionEvent.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+/**
+ * Event of state transition. It contains timestamp and the state transition.
+ * @param <T> class of state for the metric.
+ */
+public final class StateTransitionEvent<T> extends Event {
+  private T prevState;
+  private T newState;
+
+  public StateTransitionEvent(final long timestamp, final T prevState, final T newState) {
+    super(timestamp);
+    this.prevState = prevState;
+    this.newState = newState;
+  }
+
+  /**
+   * Get previous state.
+   * @return previous state.
+   */
+  public T getPrevState() {
+    return prevState;
+  }
+
+  /**
+   * Get new state.
+   * @return new state.
+   */
+  public T getNewState() {
+    return newState;
+  }
+}
diff --git a/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
new file mode 100644
index 000000000..db24f4d17
--- /dev/null
+++ b/runtime/common/src/main/java/edu/snu/nemo/runtime/common/metric/TaskMetric.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.common.metric;
+
+import edu.snu.nemo.runtime.common.state.TaskState;
+import org.apache.commons.lang3.SerializationUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Metric class for {@link edu.snu.nemo.runtime.common.plan.Task}.
+ */
+public class TaskMetric implements StateMetric<TaskState.State> {
+  private String id;
+  private List<StateTransitionEvent<TaskState.State>> stateTransitionEvents = new ArrayList<>();
+  private long serializedReadBytes = -1;
+  private long encodedReadBytes = -1;
+  private long writtenBytes = -1;
+  private long boundedSourceReadTime = -1;
+  private int scheduleAttempt = -1;
+  private String containerId = "";
+
+  private static final Logger LOG = LoggerFactory.getLogger(TaskMetric.class.getName());
+
+  public TaskMetric(final String id) {
+    this.id = id;
+  }
+
+  public final long getSerializedReadBytes() {
+    return serializedReadBytes;
+  }
+
+  private void setSerializedReadBytes(final long serializedReadBytes) {
+    this.serializedReadBytes = serializedReadBytes;
+  }
+
+  public final long getEncodedReadBytes() {
+    return encodedReadBytes;
+  }
+
+  private void setEncodedReadBytes(final long encodedReadBytes) {
+    this.encodedReadBytes = encodedReadBytes;
+  }
+
+  public final long getBoundedSourceReadTime() {
+    return boundedSourceReadTime;
+  }
+
+  private void setBoundedSourceReadTime(final long boundedSourceReadTime) {
+    this.boundedSourceReadTime = boundedSourceReadTime;
+  }
+
+  public final long getWrittenBytes() {
+    return writtenBytes;
+  }
+
+  private void setWrittenBytes(final long writtenBytes) {
+    this.writtenBytes = writtenBytes;
+  }
+
+  public final int getScheduleAttempt() {
+    return scheduleAttempt;
+  }
+
+  private void setScheduleAttempt(final int scheduleAttempt) {
+    this.scheduleAttempt = scheduleAttempt;
+  }
+
+  public final String getContainerId() {
+    return containerId;
+  }
+
+  private void setContainerId(final String containerId) {
+    this.containerId = containerId;
+  }
+
+  @Override
+  public final List<StateTransitionEvent<TaskState.State>> getStateTransitionEvents() {
+    return stateTransitionEvents;
+  }
+
+  @Override
+  public final String getId() {
+    return id;
+  }
+
+  @Override
+  public final void addEvent(final TaskState.State prevState, final TaskState.State newState) {
+    stateTransitionEvents.add(new StateTransitionEvent<>(System.currentTimeMillis(), prevState, newState));
+  }
+
+  private void addEvent(final StateTransitionEvent<TaskState.State> event) {
+    stateTransitionEvents.add(event);
+  }
+
+  @Override
+  public final boolean processMetricMessage(final String metricField, final byte[] metricValue) {
+    LOG.info("metric {} is just arrived!", metricField);
+    switch (metricField) {
+      case "serializedReadBytes":
+        setSerializedReadBytes(SerializationUtils.deserialize(metricValue));
+        break;
+      case "encodedReadBytes":
+        setEncodedReadBytes(SerializationUtils.deserialize(metricValue));
+        break;
+      case "boundedSourceReadTime":
+        setBoundedSourceReadTime(SerializationUtils.deserialize(metricValue));
+        break;
+      case "writtenBytes":
+        setWrittenBytes(SerializationUtils.deserialize(metricValue));
+        break;
+      case "stateTransitionEvent":
+        final StateTransitionEvent<TaskState.State> newStateTransitionEvent =
+            SerializationUtils.deserialize(metricValue);
+        addEvent(newStateTransitionEvent);
+        break;
+      case "scheduleAttempt":
+        setScheduleAttempt(SerializationUtils.deserialize(metricValue));
+        break;
+      case "containerId":
+        setContainerId(SerializationUtils.deserialize(metricValue));
+        break;
+      default:
+        LOG.warn("metricField {} is not supported.", metricField);
+        return false;
+    }
+    return true;
+  }
+}
diff --git a/runtime/common/src/main/proto/ControlMessage.proto b/runtime/common/src/main/proto/ControlMessage.proto
index 7a3cd7f70..b0b174709 100644
--- a/runtime/common/src/main/proto/ControlMessage.proto
+++ b/runtime/common/src/main/proto/ControlMessage.proto
@@ -187,6 +187,8 @@ enum BlockStore {
 
 // Common messages
 message Metric {
-    required string metricKey = 1;
-    required string metricValue = 2;
+    required string metricType = 1;
+    required string metricId = 2;
+    required string metricField = 3;
+    required bytes metricValue = 4;
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricCollector.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricCollector.java
deleted file mode 100644
index 3e53b2e7e..000000000
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricCollector.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2018 Seoul National University
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package edu.snu.nemo.runtime.executor;
-
-import edu.snu.nemo.runtime.common.metric.MetricDataBuilder;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This metric collector collects metrics and send through {@link MetricMessageSender}.
- */
-public final class MetricCollector {
-
-  private final MetricMessageSender metricMessageSender;
-  private final Map<String, MetricDataBuilder> metricDataBuilderMap;
-
-  /**
-   * Constructor.
-   *
-   * @param metricMessageSender the metric message sender.
-   */
-  public MetricCollector(final MetricMessageSender metricMessageSender) {
-    this.metricMessageSender = metricMessageSender;
-    this.metricDataBuilderMap = new HashMap<>();
-  }
-
-  /**
-   * Begins recording the start time of this metric measurement, in addition to the metric given.
-   * This method ensures thread-safety by synchronizing its callers.
-   *
-   * @param compUnitId    to be used as metricKey
-   * @param initialMetric metric to add
-   */
-  public void beginMeasurement(final String compUnitId, final Map<String, Object> initialMetric) {
-    final MetricDataBuilder metricDataBuilder = new MetricDataBuilder(compUnitId);
-    metricDataBuilder.beginMeasurement(initialMetric);
-    metricDataBuilderMap.put(compUnitId, metricDataBuilder);
-  }
-
-  /**
-   * Ends this metric measurement, recording the end time in addition to the metric given.
-   * This method ensures thread-safety by synchronizing its callers.
-   *
-   * @param compUnitId  to be used as metricKey
-   * @param finalMetric metric to add
-   */
-  public void endMeasurement(final String compUnitId, final Map<String, Object> finalMetric) {
-    final MetricDataBuilder metricDataBuilder = metricDataBuilderMap.get(compUnitId);
-    metricDataBuilder.endMeasurement(finalMetric);
-    metricMessageSender.send(compUnitId, metricDataBuilder.build().toJson());
-    metricDataBuilderMap.remove(compUnitId);
-  }
-}
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
index f6ffd0842..405ce4cd4 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricManagerWorker.java
@@ -15,14 +15,13 @@
  */
 package edu.snu.nemo.runtime.executor;
 
+import com.google.protobuf.ByteString;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.common.exception.UnknownFailureCauseException;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
-import edu.snu.nemo.runtime.common.metric.parameter.MetricFlushPeriod;
 import org.apache.reef.annotations.audience.EvaluatorSide;
-import org.apache.reef.tang.annotations.Parameter;
 
 import javax.inject.Inject;
 import java.util.concurrent.*;
@@ -40,17 +39,17 @@
   private final BlockingQueue<ControlMessage.Metric> metricMessageQueue;
   private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
 
+  private static final int FLUSHING_PERIOD = 3000;
   private static final Logger LOG = LoggerFactory.getLogger(MetricManagerWorker.class.getName());
 
   @Inject
-  private MetricManagerWorker(@Parameter(MetricFlushPeriod.class) final long flushingPeriod,
-                              final PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
+  private MetricManagerWorker(final PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
     this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
     this.metricMessageQueue = new LinkedBlockingQueue<>();
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
     final Runnable batchMetricMessages = () -> flushMetricMessageQueueToMaster();
     this.scheduledExecutorService.scheduleAtFixedRate(batchMetricMessages, 0,
-                                                      flushingPeriod, TimeUnit.MILLISECONDS);
+                                                      FLUSHING_PERIOD, TimeUnit.MILLISECONDS);
   }
 
   @Override
@@ -89,15 +88,19 @@ private synchronized void flushMetricMessageQueueToMaster() {
   }
 
   @Override
-  public void send(final String metricKey, final String metricValue) {
-    LOG.debug("Executor logged! {}", metricKey);
+  public void send(final String metricType, final String metricId,
+                   final String metricField, final byte[] metricValue) {
     metricMessageQueue.add(
-        ControlMessage.Metric.newBuilder().setMetricKey(metricKey).setMetricValue(metricValue).build());
+        ControlMessage.Metric.newBuilder()
+            .setMetricType(metricType)
+            .setMetricId(metricId)
+            .setMetricField(metricField)
+            .setMetricValue(ByteString.copyFrom(metricValue))
+            .build());
   }
 
   @Override
   public void close() throws UnknownFailureCauseException {
-    LOG.info("Shutting down MetricManager ");
     scheduledExecutorService.shutdownNow();
     flushMetricMessageQueueToMaster();
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
index dcd515590..a7693f6d9 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/MetricMessageSender.java
@@ -25,10 +25,12 @@
 
   /**
    * Send metric to master.
-   * @param metricKey key of the metric
-   * @param metricValue value of the metric
+   * @param metricType type of the metric
+   * @param metricId id of the metric
+   * @param metricField field of the metric
+   * @param metricValue value of the metric which is serialized
    */
-  void send(final String metricKey, final String metricValue);
+  void send(final String metricType, final String metricId, final String metricField, final byte[] metricValue);
 
   /**
    * Flush all metric inside of the queue.
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
index 964562185..a707b0321 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/TaskStateManager.java
@@ -21,11 +21,13 @@
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.common.message.PersistentConnectionToMasterMap;
+import edu.snu.nemo.runtime.common.metric.StateTransitionEvent;
 import edu.snu.nemo.runtime.common.plan.Task;
 
 import java.util.*;
 
 import edu.snu.nemo.runtime.common.state.TaskState;
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.reef.annotations.audience.EvaluatorSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -41,7 +43,7 @@
   private final String taskId;
   private final int attemptIdx;
   private final String executorId;
-  private final MetricCollector metricCollector;
+  private final MetricMessageSender metricMessageSender;
   private final PersistentConnectionToMasterMap persistentConnectionToMasterMap;
 
   public TaskStateManager(final Task task,
@@ -52,7 +54,12 @@ public TaskStateManager(final Task task,
     this.attemptIdx = task.getAttemptIdx();
     this.executorId = executorId;
     this.persistentConnectionToMasterMap = persistentConnectionToMasterMap;
-    this.metricCollector = new MetricCollector(metricMessageSender);
+    this.metricMessageSender = metricMessageSender;
+
+    metricMessageSender.send("TaskMetric", taskId,
+        "containerId", SerializationUtils.serialize(executorId));
+    metricMessageSender.send("TaskMetric", taskId,
+        "scheduleAttempt", SerializationUtils.serialize(attemptIdx));
   }
 
   /**
@@ -64,32 +71,25 @@ public TaskStateManager(final Task task,
   public synchronized void onTaskStateChanged(final TaskState.State newState,
                                               final Optional<String> vertexPutOnHold,
                                               final Optional<TaskState.RecoverableTaskFailureCause> cause) {
-    final Map<String, Object> metric = new HashMap<>();
+    metricMessageSender.send("TaskMetric", taskId,
+        "stateTransitionEvent", SerializationUtils.serialize(new StateTransitionEvent<>(
+            System.currentTimeMillis(), null, newState
+        )));
 
     switch (newState) {
       case EXECUTING:
         LOG.debug("Executing Task ID {}...", this.taskId);
-        metric.put("ContainerId", executorId);
-        metric.put("ScheduleAttempt", attemptIdx);
-        metric.put("FromState", newState);
-        metricCollector.beginMeasurement(taskId, metric);
         break;
       case COMPLETE:
         LOG.debug("Task ID {} complete!", this.taskId);
-        metric.put("ToState", newState);
-        metricCollector.endMeasurement(taskId, metric);
         notifyTaskStateToMaster(newState, Optional.empty(), cause);
         break;
       case SHOULD_RETRY:
         LOG.debug("Task ID {} failed (recoverable).", this.taskId);
-        metric.put("ToState", newState);
-        metricCollector.endMeasurement(taskId, metric);
         notifyTaskStateToMaster(newState, Optional.empty(), cause);
         break;
       case FAILED:
         LOG.debug("Task ID {} failed (unrecoverable).", this.taskId);
-        metric.put("ToState", newState);
-        metricCollector.endMeasurement(taskId, metric);
         notifyTaskStateToMaster(newState, Optional.empty(), cause);
         break;
       case ON_HOLD:
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
index 3dbc6890d..c7aeb0e65 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/DataFetcher.java
@@ -18,7 +18,6 @@
 import edu.snu.nemo.common.ir.vertex.IRVertex;
 
 import java.io.IOException;
-import java.util.Map;
 
 /**
  * An abstraction for fetching data from task-external sources.
@@ -26,18 +25,15 @@
 abstract class DataFetcher {
   private final IRVertex dataSource;
   private final VertexHarness child;
-  private final Map<String, Object> metricMap;
   private final boolean isToSideInput;
   private final boolean isFromSideInput;
 
   DataFetcher(final IRVertex dataSource,
               final VertexHarness child,
-              final Map<String, Object> metricMap,
               final boolean isFromSideInput,
               final boolean isToSideInput) {
     this.dataSource = dataSource;
     this.child = child;
-    this.metricMap = metricMap;
     this.isToSideInput = isToSideInput;
     this.isFromSideInput = isFromSideInput;
   }
@@ -50,10 +46,6 @@
    */
   abstract Object fetchDataElement() throws IOException;
 
-  protected Map<String, Object> getMetricMap() {
-    return metricMap;
-  }
-
   VertexHarness getChild() {
     return child;
   }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index 1f4276add..68ba2c111 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -24,7 +24,6 @@
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -44,22 +43,21 @@
   private DataUtil.IteratorWithNumBytes currentIterator;
   private int currentIteratorIndex;
   private boolean noElementAtAll = true;
+  private long serBytes = 0;
+  private long encodedBytes = 0;
 
   ParentTaskDataFetcher(final IRVertex dataSource,
                         final InputReader readerForParentTask,
                         final VertexHarness child,
-                        final Map<String, Object> metricMap,
                         final boolean isToSideInput) {
-    super(dataSource, child, metricMap, readerForParentTask.isSideInputReader(), isToSideInput);
+    super(dataSource, child, readerForParentTask.isSideInputReader(), isToSideInput);
     this.readersForParentTask = readerForParentTask;
     this.hasFetchStarted = false;
     this.currentIteratorIndex = 0;
     this.iteratorQueue = new LinkedBlockingQueue<>();
   }
 
-  private void handleMetric(final DataUtil.IteratorWithNumBytes iterator) {
-    long serBytes = 0;
-    long encodedBytes = 0;
+  private void countBytes(final DataUtil.IteratorWithNumBytes iterator) {
     try {
       serBytes += iterator.getNumSerializedBytes();
     } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
@@ -74,10 +72,6 @@ private void handleMetric(final DataUtil.IteratorWithNumBytes iterator) {
     } catch (final IllegalStateException e) {
       LOG.error("Failed to get the number of bytes of encoded data - the data is not ready yet ", e);
     }
-    if (serBytes != encodedBytes) {
-      getMetricMap().put("ReadBytes(raw)", serBytes);
-    }
-    getMetricMap().put("ReadBytes", encodedBytes);
   }
 
   /**
@@ -126,7 +120,7 @@ Object fetchDataElement() throws IOException {
           }
         } else {
           // Advance to the next one
-          handleMetric(currentIterator);
+          countBytes(currentIterator);
           advanceIterator();
           return fetchDataElement();
         }
@@ -160,4 +154,12 @@ private void advanceIterator() throws Throwable {
       this.currentIteratorIndex++;
     }
   }
+
+  public final long getSerializedBytes() {
+    return serBytes;
+  }
+
+  public final long getEncodedBytes() {
+    return encodedBytes;
+  }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 116b9c410..817139b3d 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -20,7 +20,6 @@
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.Map;
 
 /**
  * Fetches data from a data source.
@@ -30,13 +29,13 @@
 
   // Non-finals (lazy fetching)
   private Iterator iterator;
+  private long boundedSourceReadTime = 0;
 
   SourceVertexDataFetcher(final IRVertex dataSource,
                           final Readable readable,
                           final VertexHarness child,
-                          final Map<String, Object> metricMap,
                           final boolean isToSideInput) {
-    super(dataSource, child, metricMap, false, isToSideInput);
+    super(dataSource, child, false, isToSideInput);
     this.readable = readable;
   }
 
@@ -45,7 +44,7 @@ Object fetchDataElement() throws IOException {
     if (iterator == null) {
       final long start = System.currentTimeMillis();
       iterator = this.readable.read().iterator();
-      getMetricMap().put("BoundedSourceReadTime(ms)", System.currentTimeMillis() - start);
+      boundedSourceReadTime += System.currentTimeMillis() - start;
     }
 
     if (iterator.hasNext()) {
@@ -54,4 +53,8 @@ Object fetchDataElement() throws IOException {
       return null;
     }
   }
+
+  public final long getBoundedSourceReadTime() {
+    return boundedSourceReadTime;
+  }
 }
diff --git a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
index 78974b458..741bb300b 100644
--- a/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/edu/snu/nemo/runtime/executor/task/TaskExecutor.java
@@ -31,7 +31,6 @@
 import edu.snu.nemo.runtime.common.plan.StageEdge;
 import edu.snu.nemo.runtime.common.plan.RuntimeEdge;
 import edu.snu.nemo.runtime.common.state.TaskState;
-import edu.snu.nemo.runtime.executor.MetricCollector;
 import edu.snu.nemo.runtime.executor.MetricMessageSender;
 import edu.snu.nemo.runtime.executor.TaskStateManager;
 import edu.snu.nemo.runtime.executor.datatransfer.*;
@@ -39,6 +38,8 @@
 import java.io.IOException;
 import java.util.*;
 import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,8 +64,10 @@
   private final Map sideInputMap;
 
   // Metrics information
-  private final Map<String, Object> metricMap;
-  private final MetricCollector metricCollector;
+  private long boundedSourceReadTime = 0;
+  private long serializedReadBytes = 0;
+  private long encodedReadBytes = 0;
+  private final MetricMessageSender metricMessageSender;
 
   // Dynamic optimization
   private String idOfVertexPutOnHold;
@@ -90,9 +93,8 @@ public TaskExecutor(final Task task,
     this.taskId = task.getTaskId();
     this.taskStateManager = taskStateManager;
 
-    // Metrics information
-    this.metricMap = new HashMap<>();
-    this.metricCollector = new MetricCollector(metricMessageSender);
+    // Metric sender
+    this.metricMessageSender = metricMessageSender;
 
     // Dynamic optimization
     // Assigning null is very bad, but we are keeping this for now
@@ -174,13 +176,13 @@ public TaskExecutor(final Task task,
       final boolean isToSideInput = isToSideInputs.stream().anyMatch(bool -> bool);
       if (irVertex instanceof SourceVertex) {
         dataFetcherList.add(new SourceVertexDataFetcher(
-            irVertex, sourceReader.get(), vertexHarness, metricMap, isToSideInput)); // Source vertex read
+            irVertex, sourceReader.get(), vertexHarness, isToSideInput)); // Source vertex read
       }
       final List<InputReader> parentTaskReaders =
           getParentTaskReaders(taskIndex, irVertex, task.getTaskIncomingEdges(), dataTransferFactory);
       parentTaskReaders.forEach(parentTaskReader -> {
         dataFetcherList.add(new ParentTaskDataFetcher(parentTaskReader.getSrcIrVertex(), parentTaskReader,
-            vertexHarness, metricMap, isToSideInput)); // Parent-task read
+            vertexHarness, isToSideInput)); // Parent-task read
       });
     });
 
@@ -254,7 +256,6 @@ private void doExecute() {
     }
     LOG.info("{} started", taskId);
     taskStateManager.onTaskStateChanged(TaskState.State.EXECUTING, Optional.empty(), Optional.empty());
-    metricCollector.beginMeasurement(taskId, metricMap);
 
     // Phase 1: Consume task-external side-input related data.
     final Map<Boolean, List<DataFetcher>> sideInputRelated = dataFetchers.stream()
@@ -277,6 +278,13 @@ private void doExecute() {
       return;
     }
 
+    metricMessageSender.send("TaskMetric", taskId,
+        "boundedSourceReadTime", SerializationUtils.serialize(boundedSourceReadTime));
+    metricMessageSender.send("TaskMetric", taskId,
+        "serializedReadBytes", SerializationUtils.serialize(serializedReadBytes));
+    metricMessageSender.send("TaskMetric", taskId,
+        "encodedReadBytes", SerializationUtils.serialize(encodedReadBytes));
+
     // Phase 3: Finalize task-internal states and elements
     for (final VertexHarness vertexHarness : sortedHarnesses) {
       if (finalizeLater.contains(vertexHarness)) {
@@ -284,8 +292,6 @@ private void doExecute() {
       }
     }
 
-    // Miscellaneous: Metrics, DynOpt, etc
-    metricCollector.endMeasurement(taskId, metricMap);
     if (idOfVertexPutOnHold == null) {
       taskStateManager.onTaskStateChanged(TaskState.State.COMPLETE, Optional.empty(), Optional.empty());
       LOG.info("{} completed", taskId);
@@ -357,6 +363,12 @@ private boolean handleDataFetchers(final List<DataFetcher> fetchers) {
         }
 
         if (element == null) {
+          if (dataFetcher instanceof SourceVertexDataFetcher) {
+            boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
+          } else if (dataFetcher instanceof ParentTaskDataFetcher) {
+            serializedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getSerializedBytes();
+            encodedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getEncodedBytes();
+          }
           finishedFetcherIndex = i;
           break;
         } else {
@@ -470,11 +482,6 @@ private void setIRVertexPutOnHold(final MetricCollectionBarrierVertex irVertex)
    */
   private void finalizeOutputWriters(final VertexHarness vertexHarness) {
     final List<Long> writtenBytesList = new ArrayList<>();
-    final Map<String, Object> metric = new HashMap<>();
-    final IRVertex irVertex = vertexHarness.getIRVertex();
-
-    metricCollector.beginMeasurement(irVertex.getId(), metric);
-    final long writeStartTime = System.currentTimeMillis();
 
     vertexHarness.getWritersToChildrenTasks().forEach(outputWriter -> {
       outputWriter.close();
@@ -482,15 +489,11 @@ private void finalizeOutputWriters(final VertexHarness vertexHarness) {
       writtenBytes.ifPresent(writtenBytesList::add);
     });
 
-    final long writeEndTime = System.currentTimeMillis();
-    metric.put("OutputWriteTime(ms)", writeEndTime - writeStartTime);
-    if (!writtenBytesList.isEmpty()) {
-      long totalWrittenBytes = 0;
-      for (final Long writtenBytes : writtenBytesList) {
-        totalWrittenBytes += writtenBytes;
-      }
-      metricMap.put("WrittenBytes", totalWrittenBytes);
+    long totalWrittenBytes = 0;
+    for (final Long writtenBytes : writtenBytesList) {
+      totalWrittenBytes += writtenBytes;
     }
-    metricCollector.endMeasurement(irVertex.getId(), metric);
+    metricMessageSender.send("TaskMetric", taskId,
+        "writtenBytes", SerializationUtils.serialize(totalWrittenBytes));
   }
 }
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
index e2b3826d3..931e7515f 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/ParentTaskDataFetcherTest.java
@@ -108,7 +108,6 @@ private ParentTaskDataFetcher createFetcher(final InputReader readerForParentTas
         mock(IRVertex.class),
         readerForParentTask, // This is the only argument that affects the behavior of ParentTaskDataFetcher
         mock(VertexHarness.class),
-        new HashMap<>(0),
         false);
   }
 
diff --git a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
index b6b5bc4fd..f9c46632b 100644
--- a/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/edu/snu/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -102,7 +102,7 @@ public void setUp() throws Exception {
 
     // Mock a MetricMessageSender.
     metricMessageSender = mock(MetricMessageSender.class);
-    doNothing().when(metricMessageSender).send(anyString(), anyString());
+    doNothing().when(metricMessageSender).send(anyString(), anyString(), anyString(), any());
     doNothing().when(metricMessageSender).close();
 
     persistentConnectionToMasterMap = mock(PersistentConnectionToMasterMap.class);
diff --git a/runtime/master/pom.xml b/runtime/master/pom.xml
index 6cb5abc71..8ea468804 100644
--- a/runtime/master/pom.xml
+++ b/runtime/master/pom.xml
@@ -51,6 +51,26 @@ limitations under the License.
             <artifactId>nemo-runtime-plangenerator</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <version>${jetty-server.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-servlet</artifactId>
+            <version>${jetty-servlet.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty.websocket</groupId>
+            <artifactId>websocket-api</artifactId>
+            <version>${jetty-servlet.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty.websocket</groupId>
+            <artifactId>websocket-server</artifactId>
+            <version>${jetty-servlet.version}</version>
+        </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-core</artifactId>
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
index 2a0ef882a..fd3e582ec 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/JobStateManager.java
@@ -20,7 +20,6 @@
 import edu.snu.nemo.common.exception.SchedulingException;
 import edu.snu.nemo.common.exception.UnknownExecutionStateException;
 import edu.snu.nemo.common.StateMachine;
-import edu.snu.nemo.runtime.common.metric.MetricDataBuilder;
 import edu.snu.nemo.runtime.common.RuntimeIdGenerator;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.plan.Stage;
@@ -37,6 +36,9 @@
 import java.util.concurrent.locks.ReentrantLock;
 
 import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.common.metric.JobMetric;
+import edu.snu.nemo.runtime.common.metric.StageMetric;
+import edu.snu.nemo.runtime.common.metric.TaskMetric;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,7 +89,8 @@
    * For metrics.
    */
   private final MetricMessageHandler metricMessageHandler;
-  private final Map<String, MetricDataBuilder> metricDataBuilderMap;
+
+  private MetricStore metricStore;
 
   public JobStateManager(final PhysicalPlan physicalPlan,
                          final MetricMessageHandler metricMessageHandler,
@@ -102,7 +105,10 @@ public JobStateManager(final PhysicalPlan physicalPlan,
     this.taskIdToCurrentAttempt = new HashMap<>();
     this.finishLock = new ReentrantLock();
     this.jobFinishedCondition = finishLock.newCondition();
-    this.metricDataBuilderMap = new HashMap<>();
+    this.metricStore = MetricStore.getStore();
+
+    metricStore.getOrCreateMetric(JobMetric.class, jobId).setStageDAG(physicalPlan.getStageDAG());
+    metricStore.triggerBroadcast(JobMetric.class, jobId);
     initializeComputationStates();
   }
 
@@ -140,25 +146,21 @@ public synchronized void onTaskStateChanged(final String taskId, final TaskState
     LOG.debug("Task State Transition: id {}, from {} to {}",
         new Object[]{taskId, taskState.getCurrentState(), newTaskState});
 
+    metricStore.getOrCreateMetric(TaskMetric.class, taskId)
+        .addEvent((TaskState.State) taskState.getCurrentState(), newTaskState);
+    metricStore.triggerBroadcast(TaskMetric.class, taskId);
+
     taskState.setState(newTaskState);
 
-    // Handle metrics
-    final Map<String, Object> metric = new HashMap<>();
     switch (newTaskState) {
       case ON_HOLD:
       case COMPLETE:
       case FAILED:
       case SHOULD_RETRY:
-        metric.put("ToState", newTaskState);
-        endMeasurement(taskId, metric);
-        break;
       case EXECUTING:
-        metric.put("FromState", newTaskState);
-        beginMeasurement(taskId, metric);
         break;
       case READY:
         final int currentAttempt = taskIdToCurrentAttempt.get(taskId) + 1;
-        metric.put("ScheduleAttempt", currentAttempt);
         if (currentAttempt <= maxScheduleAttempt) {
           taskIdToCurrentAttempt.put(taskId, currentAttempt);
         } else {
@@ -215,21 +217,16 @@ public synchronized void onTaskStateChanged(final String taskId, final TaskState
   private void onStageStateChanged(final String stageId, final StageState.State newStageState) {
     // Change stage state
     final StateMachine stageStateMachine = idToStageStates.get(stageId).getStateMachine();
+
+    metricStore.getOrCreateMetric(StageMetric.class, stageId)
+        .addEvent(getStageState(stageId), newStageState);
+    metricStore.triggerBroadcast(StageMetric.class, stageId);
+
     LOG.debug("Stage State Transition: id {} from {} to {}",
         new Object[]{stageId, stageStateMachine.getCurrentState(), newStageState});
     stageStateMachine.setState(newStageState);
 
-    // Metric handling
-    final Map<String, Object> metric = new HashMap<>();
-    if (newStageState == StageState.State.INCOMPLETE) {
-      metric.put("FromState", newStageState);
-      beginMeasurement(stageId, metric);
-    } else if (newStageState == StageState.State.COMPLETE) {
-      metric.put("ToState", newStageState);
-      endMeasurement(stageId, metric);
-    }
-
-    // Job becomse COMPLETE
+    // Change job state if needed
     final boolean allStagesCompleted = idToStageStates.values().stream().allMatch(state ->
         state.getStateMachine().getCurrentState().equals(StageState.State.COMPLETE));
     if (allStagesCompleted) {
@@ -243,20 +240,19 @@ private void onStageStateChanged(final String stageId, final StageState.State ne
    * @param newState of the job.
    */
   private void onJobStateChanged(final JobState.State newState) {
+    metricStore.getOrCreateMetric(JobMetric.class, jobId)
+        .addEvent((JobState.State) jobState.getStateMachine().getCurrentState(), newState);
+    metricStore.triggerBroadcast(JobMetric.class, jobId);
+
     jobState.getStateMachine().setState(newState);
 
-    final Map<String, Object> metric = new HashMap<>();
     if (newState == JobState.State.EXECUTING) {
-      LOG.info("Executing Job ID {}...", this.jobId);
-      metric.put("FromState", newState);
-      beginMeasurement(jobId, metric);
+      LOG.debug("Executing Job ID {}...", this.jobId);
     } else if (newState == JobState.State.COMPLETE || newState == JobState.State.FAILED) {
       LOG.info("Job ID {} {}!", new Object[]{jobId, newState});
 
       // Awake all threads waiting the finish of this job.
       finishLock.lock();
-      metric.put("ToState", newState);
-      endMeasurement(jobId, metric);
 
       try {
         jobFinishedCondition.signalAll();
@@ -344,36 +340,6 @@ public synchronized int getTaskAttempt(final String taskId) {
     return idToTaskStates;
   }
 
-  /**
-   * Begins recording the start time of this metric measurement, in addition to the metric given.
-   * This method ensures thread-safety by synchronizing its callers.
-   * @param compUnitId to be used as metricKey
-   * @param initialMetric metric to add
-   */
-  private void beginMeasurement(final String compUnitId, final Map<String, Object> initialMetric) {
-    final MetricDataBuilder metricDataBuilder = new MetricDataBuilder(compUnitId);
-    metricDataBuilder.beginMeasurement(initialMetric);
-    metricDataBuilderMap.put(compUnitId, metricDataBuilder);
-  }
-
-  /**
-   * Ends this metric measurement, recording the end time in addition to the metric given.
-   * This method ensures thread-safety by synchronizing its callers.
-   * @param compUnitId to be used as metricKey
-   * @param finalMetric metric to add
-   */
-  private void endMeasurement(final String compUnitId, final Map<String, Object> finalMetric) {
-    final MetricDataBuilder metricDataBuilder = metricDataBuilderMap.get(compUnitId);
-
-    // may be null when a Task fails without entering the executing state (due to an input read failure)
-    if (metricDataBuilder != null) {
-      finalMetric.put("ContainerId", "Master");
-      metricDataBuilder.endMeasurement(finalMetric);
-      metricMessageHandler.onMetricMessageReceived(compUnitId, metricDataBuilder.build().toJson());
-      metricDataBuilderMap.remove(compUnitId);
-    }
-  }
-
   /**
    * Stores JSON representation of job state into a file.
    * @param directory the directory which JSON representation is saved to
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricBroadcaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricBroadcaster.java
new file mode 100644
index 000000000..f9f0be612
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricBroadcaster.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import org.eclipse.jetty.websocket.api.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * MetricBroadcaster broadcast metric changes to the currently active WebSocket sessions.
+ */
+public final class MetricBroadcaster {
+  private static final Logger LOG = LoggerFactory.getLogger(MetricBroadcaster.class.getName());
+  private final Set<Session> sessions = ConcurrentHashMap.newKeySet();
+  /**
+   * Private constructor.
+   */
+  private MetricBroadcaster() { }
+
+  /**
+   * Getter for the singleton object.
+   * @return MetricBroadcaster object.
+   */
+  public static MetricBroadcaster getInstance() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  /**
+   * Lazy class object holder for MetricBroadcaster class.
+   */
+  private static class InstanceHolder {
+    private static final MetricBroadcaster INSTANCE = new MetricBroadcaster();
+  }
+
+  /**
+   * Add a session to the session list.
+   * @param session a WebSocket session.
+   */
+  public synchronized void addSession(final Session session) {
+    try {
+      session.getRemote().sendString(MetricStore.getStore().dumpAllMetricToJson());
+    } catch (final IOException e) {
+      LOG.warn("Failed to send initial metric to newly connected session.");
+    }
+    sessions.add(session);
+  }
+
+  /**
+   * Remove a session from the session list.
+   * @param session a WebSocket session.
+   */
+  public synchronized void removeSession(final Session session) {
+    sessions.remove(session);
+  }
+
+  /**
+   * Send text frame to each WebSocket session.
+   * @param text text to send.
+   */
+  public void broadcast(final String text) {
+    for (final Session session : sessions) {
+      try {
+        session.getRemote().sendString(text);
+      } catch (final IOException e) {
+        LOG.warn("Failed to send string to remote session {}.", session.getRemoteAddress().toString());
+      }
+    }
+  }
+
+  /**
+   * Send binary frame to each WebSocket session.
+   * @param bytes byte array to send.
+   */
+  public void broadcast(final byte[] bytes) {
+    for (final Session session : sessions) {
+      try {
+        session.getRemote().sendBytes(ByteBuffer.wrap(bytes));
+      } catch (final IOException e) {
+        LOG.warn("Failed to send binary to remote session {}.", session.getRemoteAddress().toString());
+      }
+    }
+  }
+
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
index 5b6a8fdcc..4a266d548 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricManagerMaster.java
@@ -21,15 +21,11 @@
 import edu.snu.nemo.runtime.common.comm.ControlMessage;
 import edu.snu.nemo.runtime.common.message.MessageEnvironment;
 import edu.snu.nemo.runtime.master.scheduler.ExecutorRegistry;
+import edu.snu.nemo.runtime.common.metric.Metric;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
 /**
  * A default metric message handler.
  */
@@ -37,13 +33,12 @@
 public final class MetricManagerMaster implements MetricMessageHandler {
 
   private static final Logger LOG = LoggerFactory.getLogger(MetricManagerMaster.class.getName());
-  private final Map<String, List<String>> compUnitIdToMetricInJson;
+  private final MetricStore metricStore = MetricStore.getStore();
   private boolean isTerminated;
   private final ExecutorRegistry executorRegistry;
 
   @Inject
   private MetricManagerMaster(final ExecutorRegistry executorRegistry) {
-    this.compUnitIdToMetricInJson = new HashMap<>();
     this.isTerminated = false;
     this.executorRegistry = executorRegistry;
   }
@@ -60,24 +55,25 @@ public synchronized void sendMetricFlushRequest() {
   }
 
   @Override
-  public synchronized void onMetricMessageReceived(final String metricKey, final String metricValue) {
+  public synchronized void onMetricMessageReceived(final String metricType,
+                                                   final String metricId,
+                                                   final String metricField,
+                                                   final byte[] metricValue) {
     if (!isTerminated) {
-      compUnitIdToMetricInJson.putIfAbsent(metricKey, new LinkedList<>());
-      compUnitIdToMetricInJson.get(metricKey).add(metricValue);
-      LOG.debug("{\"computationUnitId\":\"{}\", \"metricList\":{}}", metricKey, metricValue);
+      final Class<Metric> metricClass = metricStore.getMetricClassByName(metricType);
+      // process metric message
+      try {
+        if (metricStore.getOrCreateMetric(metricClass, metricId).processMetricMessage(metricField, metricValue)) {
+          metricStore.triggerBroadcast(metricClass, metricId);
+        }
+      } catch (final Exception e) {
+        LOG.warn("Error when processing metric message for {}, {}, {}.", metricType, metricId, metricField);
+      }
     }
   }
 
-  @Override
-  public synchronized List<String> getMetricByKey(final String metricKey) {
-    return compUnitIdToMetricInJson.get(metricKey);
-  }
-
   @Override
   public synchronized void terminate() {
-    compUnitIdToMetricInJson.forEach((compUnitId, metricList) ->
-        LOG.info("{\"computationUnitId\":\"{}\", \"metricList\":{}}", compUnitId, metricList));
-    compUnitIdToMetricInJson.clear();
     isTerminated = true;
   }
 }
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java
index 334bc0c00..15ee0834a 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricMessageHandler.java
@@ -17,8 +17,6 @@
 
 import org.apache.reef.tang.annotations.DefaultImplementation;
 
-import java.util.List;
-
 /**
  * Metric message handler.
  */
@@ -27,17 +25,13 @@
 
   /**
    * Handle the received metric message.
-   * @param metricKey a given key for the metric (ex. Task ID)
-   * @param metricValue the metric formatted as a string (ex. JSON).
-   */
-  void onMetricMessageReceived(final String metricKey, final String metricValue);
-
-  /**
-   * Retrieves the string form of metric given the metric key.
-   * @param metricKey to retrieve the metric for
-   * @return the list of accumulated metric in string (ex. JSON)
+   * @param metricType a given type for the metric (ex. TaskMetric).
+   * @param metricId  id of the metric.
+   * @param metricField field name of the metric.
+   * @param metricValue serialized metric data value.
    */
-  List<String> getMetricByKey(final String metricKey);
+  void onMetricMessageReceived(final String metricType, final String metricId,
+                               final String metricField, final byte[] metricValue);
 
   /**
    * Cleans up and terminates this handler.
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java
new file mode 100644
index 000000000..09c7c9d35
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/MetricStore.java
@@ -0,0 +1,254 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import edu.snu.nemo.common.exception.UnsupportedMetricException;
+import edu.snu.nemo.runtime.common.metric.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * MetricStore stores metric data which will be used by web visualize interface, logging, and so on.
+ * All metric classes should be JSON-serializable by {@link ObjectMapper}.
+ */
+public final class MetricStore {
+  private final Map<Class, Map<String, Object>> metricMap = new HashMap<>();
+  // You can add more metrics by adding item to this metricList list.
+  private final Map<String, Class> metricList = new HashMap<String, Class>() {
+    {
+      put("JobMetric", JobMetric.class);
+      put("StageMetric", StageMetric.class);
+      put("TaskMetric", TaskMetric.class);
+    }
+  };
+
+  /**
+   * Private constructor.
+   */
+  private MetricStore() { }
+
+  /**
+   * Getter for singleton instance.
+   * @return MetricStore object.
+   */
+  public static MetricStore getStore() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  /**
+   * Lazy class object holder for MetricStore class.
+   */
+  private static class InstanceHolder {
+    private static final MetricStore INSTANCE = new MetricStore();
+  }
+
+  public <T extends Metric> Class<T> getMetricClassByName(final String className) {
+    if (!metricList.keySet().contains(className)) {
+      throw new NoSuchElementException();
+    }
+
+    return metricList.get(className);
+  }
+
+  /**
+   * Store a metric object. Metric object should implement {@link Metric} interface.
+   * This method will store a metric into a {@link Map}, which have metric's id as its key.
+   * @param metric metric object.
+   * @param <T> class of metric
+   */
+  public <T extends Metric> void putMetric(final T metric) {
+    final Class metricClass = metric.getClass();
+    if (!metricList.values().contains(metricClass)) {
+      throw new UnsupportedMetricException(new Throwable("Unsupported metric"));
+    }
+
+    metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()).putIfAbsent(metric.getId(), metric);
+  }
+
+  /**
+   * Fetch metric by its metric class instance and its id.
+   * @param metricClass class instance of metric.
+   * @param id metric id, which can be fetched by getId() method.
+   * @param <T> class of metric
+   * @return a metric object.
+   */
+  public <T extends Metric> T getMetricWithId(final Class<T> metricClass, final String id) {
+    final T metric = (T) metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()).get(id);
+    if (metric == null) {
+      throw new NoSuchElementException("No metric found");
+    }
+    return metric;
+  }
+
+  /**
+   * Fetch metric map by its metric class instance.
+   * @param metricClass class instance of metric.
+   * @param <T> class of metric
+   * @return a metric object.
+   */
+  public <T extends Metric> Map<String, Object> getMetricMap(final Class<T> metricClass) {
+    final Map<String, Object> metric = metricMap.computeIfAbsent(metricClass, k -> new HashMap<>());
+    if (metric == null) {
+      throw new NoSuchElementException("No metric found");
+    }
+    return metric;
+  }
+
+  /**
+   * Same as getMetricWithId(), but if there is no such metric, it will try to create new metric object
+   * using its constructor, which takes an id as a parameter.
+   * @param metricClass class of metric.
+   * @param id metric id, which can be fetched by getId() method.
+   * @param <T> class of metric
+   * @return a metric object. If there was no such metric, newly create one.
+   */
+  public <T extends Metric> T getOrCreateMetric(final Class<T> metricClass, final String id) {
+    T metric =  (T) metricMap.computeIfAbsent(metricClass, k -> new HashMap<>()).get(id);
+    if (metric == null) {
+      try {
+        metric = metricClass.getConstructor(new Class[]{String.class}).newInstance(id);
+        putMetric(metric);
+      } catch (final Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return metric;
+  }
+
+  private void generatePreprocessedJsonFromMetricEntry(final Map.Entry<String, Object> idToMetricEntry,
+                                                       final JsonGenerator jsonGenerator,
+                                                       final ObjectMapper objectMapper) throws IOException {
+    final JsonNode jsonNode = objectMapper.valueToTree(idToMetricEntry.getValue());
+    jsonGenerator.writeFieldName(idToMetricEntry.getKey());
+    jsonGenerator.writeStartObject();
+    jsonGenerator.writeFieldName("id");
+    jsonGenerator.writeString(idToMetricEntry.getKey());
+    jsonGenerator.writeFieldName("data");
+    jsonGenerator.writeTree(jsonNode);
+    jsonGenerator.writeEndObject();
+  }
+
+  /**
+   * Dumps JSON-serialized string of specific metric.
+   * @param metricClass class of metric.
+   * @return dumped JSON string of all metric.
+   * @throws IOException when failed to write json.
+   */
+  public <T extends Metric> String dumpMetricToJson(final Class<T> metricClass) throws IOException {
+    final ObjectMapper objectMapper = new ObjectMapper();
+    final JsonFactory jsonFactory = new JsonFactory();
+    final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8);
+    jsonGenerator.setCodec(objectMapper);
+
+    jsonGenerator.writeStartObject();
+    jsonGenerator.writeFieldName(metricClass.getSimpleName());
+    jsonGenerator.writeStartObject();
+    for (final Map.Entry<String, Object> idToMetricEntry : getMetricMap(metricClass).entrySet()) {
+      generatePreprocessedJsonFromMetricEntry(idToMetricEntry, jsonGenerator, objectMapper);
+    }
+    jsonGenerator.writeEndObject();
+    jsonGenerator.writeEndObject();
+
+    jsonGenerator.close();
+    return stream.toString();
+  }
+
+  /**
+   * Dumps JSON-serialized string of all stored metric.
+   * @return dumped JSON string of all metric.
+   * @throws IOException when failed to write file.
+   */
+  public synchronized String dumpAllMetricToJson() throws IOException {
+    final ObjectMapper objectMapper = new ObjectMapper();
+    final JsonFactory jsonFactory = new JsonFactory();
+    final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    final JsonGenerator jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8);
+    jsonGenerator.setCodec(objectMapper);
+
+    jsonGenerator.writeStartObject();
+    for (final Map.Entry<Class, Map<String, Object>> metricMapEntry : metricMap.entrySet()) {
+      jsonGenerator.writeFieldName(metricMapEntry.getKey().getSimpleName());
+      jsonGenerator.writeStartObject();
+      for (final Map.Entry<String, Object> idToMetricEntry : metricMapEntry.getValue().entrySet()) {
+        generatePreprocessedJsonFromMetricEntry(idToMetricEntry, jsonGenerator, objectMapper);
+      }
+      jsonGenerator.writeEndObject();
+    }
+    jsonGenerator.writeEndObject();
+
+    jsonGenerator.close();
+    return stream.toString();
+  }
+
+  /**
+   * Same as dumpAllMetricToJson(), but this will save it to the file.
+   * @param filePath path to dump JSON.
+   */
+  public void dumpAllMetricToFile(final String filePath) {
+    try {
+      final String jsonDump = dumpAllMetricToJson();
+      final BufferedWriter writer = new BufferedWriter(new FileWriter(filePath));
+
+      writer.write(jsonDump);
+      writer.close();
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+
+  /**
+   * Send changed metric data to {@link MetricBroadcaster}, which will broadcast it to
+   * all active WebSocket sessions. This method should be called manually if you want to
+   * send changed metric data to the frontend client. Also this method is synchronized.
+   * @param metricClass class of the metric.
+   * @param id id of the metric.
+   */
+  public synchronized <T extends Metric> void triggerBroadcast(final Class<T> metricClass, final String id) {
+    final MetricBroadcaster metricBroadcaster = MetricBroadcaster.getInstance();
+    final ObjectMapper objectMapper = new ObjectMapper();
+    final T metric = getMetricWithId(metricClass, id);
+    final JsonFactory jsonFactory = new JsonFactory();
+    final ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    final JsonGenerator jsonGenerator;
+    try {
+      jsonGenerator = jsonFactory.createGenerator(stream, JsonEncoding.UTF8);
+
+      jsonGenerator.setCodec(objectMapper);
+
+      jsonGenerator.writeStartObject();
+      jsonGenerator.writeFieldName("metricType");
+      jsonGenerator.writeString(metricClass.getSimpleName());
+
+      jsonGenerator.writeFieldName("data");
+      jsonGenerator.writeObject(metric);
+      jsonGenerator.writeEndObject();
+
+      jsonGenerator.close();
+
+      metricBroadcaster.broadcast(stream.toString());
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
index 6d409d8b3..98928a397 100644
--- a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/RuntimeMaster.java
@@ -26,6 +26,7 @@
 import edu.snu.nemo.runtime.common.message.MessageListener;
 import edu.snu.nemo.runtime.common.plan.PhysicalPlan;
 import edu.snu.nemo.runtime.common.state.TaskState;
+import edu.snu.nemo.runtime.master.servlet.*;
 import edu.snu.nemo.runtime.master.resource.ContainerManager;
 import edu.snu.nemo.runtime.master.resource.ExecutorRepresenter;
 import edu.snu.nemo.runtime.master.resource.ResourceSpecification;
@@ -38,6 +39,8 @@
 import org.apache.reef.driver.evaluator.FailedEvaluator;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.annotations.Parameter;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,6 +73,7 @@
   private static final Logger LOG = LoggerFactory.getLogger(RuntimeMaster.class.getName());
   private static final int DAG_LOGGING_PERIOD = 3000;
   private static final int METRIC_ARRIVE_TIMEOUT = 10000;
+  private static final int REST_SERVER_PORT = 10101;
 
   private final ExecutorService runtimeMasterThread;
 
@@ -78,6 +82,7 @@
   private final BlockManagerMaster blockManagerMaster;
   private final MetricMessageHandler metricMessageHandler;
   private final MessageEnvironment masterMessageEnvironment;
+  private final MetricStore metricStore;
   private final Map<Integer, Long> aggregatedMetricData;
   private final ClientRPC clientRPC;
   private final MetricManagerMaster metricManagerMaster;
@@ -91,6 +96,9 @@
   private final AtomicInteger resourceRequestCount;
 
   private CountDownLatch metricCountDownLatch;
+  // REST API server for web metric visualization ui.
+  private final Server metricServer;
+
 
   @Inject
   public RuntimeMaster(final Scheduler scheduler,
@@ -120,6 +128,29 @@ public RuntimeMaster(final Scheduler scheduler,
     this.resourceRequestCount = new AtomicInteger(0);
     this.objectMapper = new ObjectMapper();
     this.aggregatedMetricData = new HashMap<>();
+    this.metricStore = MetricStore.getStore();
+    this.metricServer = startRestMetricServer();
+  }
+
+  private Server startRestMetricServer() {
+    final Server server = new Server(REST_SERVER_PORT);
+
+    final ServletHandler servletHandler = new ServletHandler();
+    server.setHandler(servletHandler);
+
+    servletHandler.addServletWithMapping(JobMetricServlet.class, "/api/job");
+    servletHandler.addServletWithMapping(TaskMetricServlet.class, "/api/task");
+    servletHandler.addServletWithMapping(StageMetricServlet.class, "/api/stage");
+    servletHandler.addServletWithMapping(AllMetricServlet.class, "/api");
+    servletHandler.addServletWithMapping(WebSocketMetricServlet.class, "/api/websocket");
+
+    try {
+      server.start();
+    } catch (final Exception e) {
+      throw new RuntimeException("Failed to start REST API server.");
+    }
+
+    return server;
   }
 
   /**
@@ -172,6 +203,15 @@ public void terminate() {
       metricMessageHandler.terminate();
       containerManager.terminate();
 
+      // TODO #?: parameterize file path using Tang
+      metricStore.dumpAllMetricToFile("/tmp/dump");
+
+      try {
+        metricServer.stop();
+      } catch (final Exception e) {
+        throw new RuntimeException("Failed to stop rest api server.");
+      }
+
     });
 
     // Do not shutdown runtimeMasterThread. We need it to clean things up.
@@ -319,7 +359,9 @@ private void handleControlMessage(final ControlMessage.Message message) {
       case MetricMessageReceived:
         final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
         metricList.forEach(metric ->
-            metricMessageHandler.onMetricMessageReceived(metric.getMetricKey(), metric.getMetricValue()));
+            metricMessageHandler.onMetricMessageReceived(
+                metric.getMetricType(), metric.getMetricId(),
+                metric.getMetricField(), metric.getMetricValue().toByteArray()));
         break;
       case ExecutorDataCollected:
         final String serializedData = message.getDataCollected().getData();
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/AllMetricServlet.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/AllMetricServlet.java
new file mode 100644
index 000000000..00a828e58
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/AllMetricServlet.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.servlet;
+
+import edu.snu.nemo.runtime.master.MetricStore;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+/**
+ * Servlet which handles total metric request.
+ */
+public final class AllMetricServlet extends HttpServlet {
+
+  @Override
+  protected void doGet(final HttpServletRequest request, final HttpServletResponse response)
+      throws IOException {
+    final MetricStore metricStore = MetricStore.getStore();
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.getWriter().println(metricStore.dumpAllMetricToJson());
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/JobMetricServlet.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/JobMetricServlet.java
new file mode 100644
index 000000000..9a38d89eb
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/JobMetricServlet.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.servlet;
+
+import edu.snu.nemo.runtime.master.MetricStore;
+import edu.snu.nemo.runtime.common.metric.JobMetric;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+/**
+ * Servlet which handles {@link JobMetric} metric request.
+ */
+public final class JobMetricServlet extends HttpServlet {
+
+  @Override
+  protected void doGet(final HttpServletRequest request, final HttpServletResponse response)
+          throws IOException {
+    final MetricStore metricStore = MetricStore.getStore();
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.getWriter().println(metricStore.dumpMetricToJson(JobMetric.class));
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/StageMetricServlet.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/StageMetricServlet.java
new file mode 100644
index 000000000..cb5860ae8
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/StageMetricServlet.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.servlet;
+
+import edu.snu.nemo.runtime.master.MetricStore;
+import edu.snu.nemo.runtime.common.metric.StageMetric;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+/**
+ * Servlet which handles {@link StageMetric} metric request.
+ */
+public final class StageMetricServlet extends HttpServlet {
+
+  @Override
+  protected void doGet(final HttpServletRequest request, final HttpServletResponse response)
+      throws IOException {
+    final MetricStore metricStore = MetricStore.getStore();
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.getWriter().println(metricStore.dumpMetricToJson(StageMetric.class));
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/TaskMetricServlet.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/TaskMetricServlet.java
new file mode 100644
index 000000000..4e338fa7e
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/TaskMetricServlet.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.servlet;
+
+import edu.snu.nemo.runtime.master.MetricStore;
+import edu.snu.nemo.runtime.common.metric.TaskMetric;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+
+/**
+ * Servlet which handles {@link TaskMetric} metric request.
+ */
+public final class TaskMetricServlet extends HttpServlet {
+
+  @Override
+  protected void doGet(final HttpServletRequest request, final HttpServletResponse response)
+      throws IOException {
+    final MetricStore metricStore = MetricStore.getStore();
+    response.setContentType("application/json");
+    response.setStatus(HttpServletResponse.SC_OK);
+    response.getWriter().println(metricStore.dumpMetricToJson(TaskMetric.class));
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricAdapter.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricAdapter.java
new file mode 100644
index 000000000..6af2695fa
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricAdapter.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.servlet;
+
+import edu.snu.nemo.runtime.master.MetricBroadcaster;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.WebSocketAdapter;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Adapter for WebSocket metric request and response.
+ */
+public class WebSocketMetricAdapter extends WebSocketAdapter {
+  private static final Logger LOG = LoggerFactory.getLogger(WebSocketMetricAdapter.class.getName());
+  private Session session;
+
+  @Override
+  public final void onWebSocketConnect(final Session sess) {
+    this.session = sess;
+    MetricBroadcaster.getInstance().addSession(this.session);
+  }
+
+  @Override
+  public final void onWebSocketClose(final int statusCode, final String reason) {
+    if (statusCode != StatusCode.NORMAL) {
+      LOG.warn("WebSocket session closed abnormally: {} - {}.", statusCode, reason);
+    }
+    MetricBroadcaster.getInstance().removeSession(session);
+  }
+
+  @Override
+  public final void onWebSocketError(final Throwable throwable) {
+    MetricBroadcaster.getInstance().removeSession(session);
+  }
+
+  @Override
+  public final void onWebSocketText(final String text) {
+    try {
+      session.getRemote().sendString(text);
+    } catch (final IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git a/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricServlet.java b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricServlet.java
new file mode 100644
index 000000000..e164501c3
--- /dev/null
+++ b/runtime/master/src/main/java/edu/snu/nemo/runtime/master/servlet/WebSocketMetricServlet.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master.servlet;
+
+import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
+import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
+
+/**
+ * Servlet which handles WebSocket HTTP request.
+ */
+public class WebSocketMetricServlet extends WebSocketServlet {
+
+  @Override
+  public final void configure(final WebSocketServletFactory factory) {
+    // registers WebSocket adapter
+    factory.register(WebSocketMetricAdapter.class);
+  }
+}
diff --git a/runtime/master/src/test/java/edu/snu/nemo/runtime/master/MetricStoreTest.java b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/MetricStoreTest.java
new file mode 100644
index 000000000..54c2028ad
--- /dev/null
+++ b/runtime/master/src/test/java/edu/snu/nemo/runtime/master/MetricStoreTest.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package edu.snu.nemo.runtime.master;
+
+import com.fasterxml.jackson.core.TreeNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import edu.snu.nemo.runtime.common.metric.JobMetric;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test for {@link MetricStore}
+ */
+public final class MetricStoreTest {
+  @Test
+  public void testJson() throws IOException {
+    final MetricStore metricStore = MetricStore.getStore();
+
+    metricStore.getOrCreateMetric(JobMetric.class, "testId");
+
+    final String json = metricStore.dumpMetricToJson(JobMetric.class);
+
+    final ObjectMapper objectMapper = new ObjectMapper();
+    final TreeNode treeNode = objectMapper.readTree(json);
+
+    final TreeNode jobMetricNode = treeNode.get("JobMetric");
+    assertNotNull(jobMetricNode);
+
+    final TreeNode metricNode = jobMetricNode.get("testId");
+    assertNotNull(metricNode);
+
+    final TreeNode fieldNode = metricNode.get("id");
+    assertTrue(fieldNode.isValueNode());
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services