You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nemo.apache.org by ta...@apache.org on 2021/10/18 12:15:18 UTC

[incubator-nemo] branch master updated: [NEMO-483] Record Metrics associated with stream processing (#317)

This is an automated email from the ASF dual-hosted git repository.

taegeonum pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b242ea  [NEMO-483] Record Metrics associated with stream processing (#317)
5b242ea is described below

commit 5b242ea570a663d5c23877e62171d0e82c4d8f56
Author: Lemarais <go...@gmail.com>
AuthorDate: Mon Oct 18 21:15:13 2021 +0900

    [NEMO-483] Record Metrics associated with stream processing (#317)
    
    JIRA: [NEMO-483: Record Metrics associated with stream processing](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-483)
    
    **Major changes:**
    -
    
    Two types of metric were added. It can degrade performance, So It is recommended to use only for debugging purpose.
    
    Task periodically record how many data has received
    - Record the following metric:
    1. the number of processed tuples
    2. the size of read serialized bytes
    - The recording period can be adjusted through "stream_metric_period" argument. the unit is milli second.
    - If stream_metric_period is not set, it is disabled.
    
    For each task, record traversal time from source vertex.
    Source vertex sends latencymark from source vertex
    - The latencymark has the following data:
    1. Timestamp when it was create.
    2. Task id where it created.
    3. Task id where it delivered from. task id of upstream task.
    - When a latencymark is reached  at any task, It is recorded with the timestamp when it reached at task.
    - The period for creating latencymarks can be adjusted through "latencymark_period" argument. the unit is milli second.
    - If latencymark_period is not set, it is disabled.
    
    **Minor changes to note:**
    -
    **Tests for the changes:**
    -
    - Tested on ubuntu machine
    
    **Other comments:**
    -
    - There is a gap between when the number of processed tuples increased and when the size of read serialized bytes is increased. Because the size of read serialized bytes is increased when the data are received, the number of processed tuples is increased just before it is processed, and data fetchers are working on the multithread environment.
    - When increasing the size of read serialized bytes, It is not possible to distinguish whether it is the data to be processed or the watermark, So When the number of processed tuples is 0, the size of read serialized bytes can be not 0.
    - If a task read data from local block, because upstream task is in the same node, the size of read serialized data does not increase because the data is not serialized. Instead, isReadNotSerializedData field of StreamMetric indicates whether the task read not serialized data or not.
    
    
    Closes #317
---
 .../java/org/apache/nemo/client/JobLauncher.java   |   2 +
 .../org/apache/nemo/common/ir/OutputCollector.java |   8 ++
 ...ransform.java => LatencymarkEmitTransform.java} |  30 +++--
 .../vertex/transform/NoWatermarkEmitTransform.java |   5 +
 .../ir/vertex/transform/StreamTransform.java       |  13 +-
 .../nemo/common/ir/vertex/transform/Transform.java |   8 ++
 .../nemo/common/punctuation/Latencymark.java       | 109 +++++++++++++++++
 .../beam/transform/AbstractDoFnTransform.java      |  12 ++
 .../beam/transform/CreateViewTransform.java        |  19 +--
 .../frontend/beam/transform/FlattenTransform.java  |  15 +--
 .../frontend/beam/transform/GBKTransform.java      |   7 ++
 .../beam/transform/SideInputTransform.java         |  16 +--
 .../frontend/beam/transform/WindowFnTransform.java |  15 +--
 .../frontend/spark/transform/FlatMapTransform.java |  15 +--
 .../spark/transform/MapToPairTransform.java        |  15 +--
 .../frontend/spark/transform/MapTransform.java     |  15 +--
 .../frontend/spark/transform/ReduceTransform.java  |  11 +-
 .../beam/transform/TestOutputCollector.java        |   7 ++
 .../main/java/org/apache/nemo/conf/JobConf.java    |  23 ++++
 .../nemo/runtime/common/metric/LatencyMetric.java  |  61 ++++++++++
 .../nemo/runtime/common/metric/StreamMetric.java   |  97 +++++++++++++++
 .../nemo/runtime/common/metric/TaskMetric.java     |  42 ++++++-
 .../java/org/apache/nemo/driver/NemoDriver.java    |   8 ++
 .../org/apache/nemo/runtime/executor/Executor.java |  32 ++++-
 .../nemo/runtime/executor/data/DataUtil.java       |  80 ++++++++++++-
 .../executor/datatransfer/BlockOutputWriter.java   |   6 +
 .../datatransfer/DataFetcherOutputCollector.java   |   6 +
 .../datatransfer/NemoEventDecoderFactory.java      |  14 ++-
 .../datatransfer/NemoEventEncoderFactory.java      |   4 +
 .../OperatorVertexOutputCollector.java             |  30 +++++
 .../datatransfer/OperatorWatermarkCollector.java   |   6 +
 .../executor/datatransfer/OutputWriter.java        |  10 ++
 .../executor/datatransfer/PipeOutputWriter.java    |  10 ++
 .../RunTimeMessageOutputCollector.java             |   6 +
 .../task/MultiThreadParentTaskDataFetcher.java     |  31 +++++
 .../executor/task/ParentTaskDataFetcher.java       |  19 +++
 .../executor/task/SourceVertexDataFetcher.java     |  43 +++++--
 .../nemo/runtime/executor/task/TaskExecutor.java   | 133 ++++++++++++++++++---
 .../runtime/executor/task/TaskExecutorTest.java    |  38 +++++-
 39 files changed, 878 insertions(+), 143 deletions(-)

diff --git a/client/src/main/java/org/apache/nemo/client/JobLauncher.java b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
index acd9a22..b4ded3e 100644
--- a/client/src/main/java/org/apache/nemo/client/JobLauncher.java
+++ b/client/src/main/java/org/apache/nemo/client/JobLauncher.java
@@ -431,6 +431,8 @@ public final class JobLauncher {
     cl.registerShortNameOfClass(JobConf.DriverMemMb.class);
     cl.registerShortNameOfClass(JobConf.ExecutorJSONPath.class);
     cl.registerShortNameOfClass(JobConf.BandwidthJSONPath.class);
+    cl.registerShortNameOfClass(JobConf.StreamMetricPeriod.class);
+    cl.registerShortNameOfClass(JobConf.LatencyMarkPeriod.class);
     cl.registerShortNameOfClass(JobConf.JVMHeapSlack.class);
     cl.registerShortNameOfClass(JobConf.IORequestHandleThreadsTotal.class);
     cl.registerShortNameOfClass(JobConf.MaxTaskAttempt.class);
diff --git a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
index 0865093..916a06f 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/OutputCollector.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.common.ir;
 
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 
 import java.io.Serializable;
@@ -45,6 +46,13 @@ public interface OutputCollector<O> extends Serializable {
   void emitWatermark(Watermark watermark);
 
   /**
+   * Emit latencymark to downstream vertices.
+   *
+   * @param latencymark latencymark
+   */
+  void emitLatencymark(Latencymark latencymark);
+
+  /**
    * Multi-destination emit.
    * Currently unused, but might come in handy
    * for operations like multi-output map.
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java
similarity index 55%
copy from common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
copy to common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java
index 549a3f1..44640a1 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/LatencymarkEmitTransform.java
@@ -18,23 +18,39 @@
  */
 package org.apache.nemo.common.ir.vertex.transform;
 
-import org.apache.nemo.common.punctuation.Watermark;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Latencymark;
 
 /**
- * This transform does not emit watermarks.
- * It may be a transform for batch operation that emits collected data when calling {@link Transform#close()}.
+ * This transform emits {@link Latencymark}.
  *
  * @param <I> input type
  * @param <O> output type
  */
-public abstract class NoWatermarkEmitTransform<I, O> implements Transform<I, O> {
+public abstract class LatencymarkEmitTransform<I, O> implements Transform<I, O> {
+  private OutputCollector<O> outputCollector;
 
   /**
-   * @param watermark watermark
+   * @param context context for data transfer.
+   * @param oc OutputCollector to transfer data.
    */
   @Override
-  public final void onWatermark(final Watermark watermark) {
-    // do nothing
+  public void prepare(final Context context, final OutputCollector<O> oc) {
+    this.outputCollector = oc;
   }
 
+  /**
+   * get OutputCollector.
+   */
+  public OutputCollector<O> getOutputCollector() {
+    return outputCollector;
+  }
+
+  /**
+   * @param latencymark latencymark
+   */
+  @Override
+  public final void onLatencymark(final Latencymark latencymark) {
+    outputCollector.emitLatencymark(latencymark);
+  }
 }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
index 549a3f1..7f8b9d1 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/NoWatermarkEmitTransform.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.common.ir.vertex.transform;
 
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 
 /**
@@ -37,4 +38,8 @@ public abstract class NoWatermarkEmitTransform<I, O> implements Transform<I, O>
     // do nothing
   }
 
+  @Override
+  public final void onLatencymark(final Latencymark latencymark) {
+    // do nothing
+  }
 }
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/StreamTransform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/StreamTransform.java
index 9781a8d..e812a9d 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/StreamTransform.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/StreamTransform.java
@@ -18,7 +18,6 @@
  */
 package org.apache.nemo.common.ir.vertex.transform;
 
-import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,8 +28,7 @@ import org.slf4j.LoggerFactory;
  *
  * @param <T> input/output type.
  */
-public final class StreamTransform<T> implements Transform<T, T> {
-  private OutputCollector<T> outputCollector;
+public final class StreamTransform<T> extends LatencymarkEmitTransform<T, T> {
   private static final Logger LOG = LoggerFactory.getLogger(StreamTransform.class.getName());
 
   /**
@@ -41,18 +39,13 @@ public final class StreamTransform<T> implements Transform<T, T> {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<T> oc) {
-    this.outputCollector = oc;
-  }
-
-  @Override
   public void onData(final T element) {
-    outputCollector.emit(element);
+    getOutputCollector().emit(element);
   }
 
   @Override
   public void onWatermark(final Watermark watermark) {
-    outputCollector.emitWatermark(watermark);
+    getOutputCollector().emitWatermark(watermark);
   }
 
   @Override
diff --git a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
index a4787e1..d16ab5c 100644
--- a/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
+++ b/common/src/main/java/org/apache/nemo/common/ir/vertex/transform/Transform.java
@@ -19,6 +19,7 @@
 package org.apache.nemo.common.ir.vertex.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 
 import java.io.Serializable;
@@ -58,6 +59,13 @@ public interface Transform<I, O> extends Serializable {
   void onWatermark(Watermark watermark);
 
   /**
+   * On latencymark received.
+   *
+   * @param latencymark latencymark.
+   */
+  void onLatencymark(Latencymark latencymark);
+
+  /**
    * Close the transform.
    */
   void close();
diff --git a/common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java b/common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
new file mode 100644
index 0000000..a331e19
--- /dev/null
+++ b/common/src/main/java/org/apache/nemo/common/punctuation/Latencymark.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.common.punctuation;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Latency mark is conveyor that has data for debugging.
+ * It is created only from source vertex and record the timestamp when it is created and taskId where it is created.
+ */
+public final class Latencymark implements Serializable {
+  private final String createdTaskId;
+  private final long createdTimestamp;
+  private String previousTaskId;
+  private long previousSentTimestamp;
+
+
+  /**
+   * @param taskId task id where it is created
+   * @param timestamp timestamp when it is created
+   */
+  public Latencymark(final String taskId, final long timestamp) {
+    this.createdTaskId = taskId;
+    this.createdTimestamp = timestamp;
+    this.previousTaskId = "";
+    this.previousSentTimestamp = 0;
+  }
+
+  /**
+   * @return the latencymark timestamp
+   */
+  public long getCreatedTimestamp() {
+    return createdTimestamp;
+  }
+
+  /**
+   * @return the task id where it is created
+   */
+  public String getCreatedTaskId() {
+    return createdTaskId;
+  }
+
+  /**
+   * @return the task id of previous task
+   */
+  public String getPreviousTaskId() {
+    return previousTaskId;
+  }
+
+  /**
+   * Set the previousTaskId.
+   *
+   * @param taskId the task id.
+   */
+  public void setPreviousTaskId(final String taskId) {
+    previousTaskId = taskId;
+  }
+
+  /**
+   * Set the previousSentTimestamp.
+   *
+   * @param timestamp the timestamp.
+   */
+  public void setPreviousSentTimestamp(final long timestamp) {
+    previousSentTimestamp = timestamp;
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    final Latencymark latencymark = (Latencymark) o;
+    return (createdTimestamp == latencymark.createdTimestamp)
+      && (createdTaskId.equals(latencymark.createdTaskId)
+      && (previousTaskId.equals(latencymark.previousTaskId)));
+  }
+
+
+  @Override
+  public String toString() {
+    return "Latencymark(" + createdTaskId + ", " + createdTimestamp + ")";
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(createdTimestamp);
+  }
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 59a3b1b..8e03d29 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.compiler.frontend.beam.InMemorySideInputReader;
 import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 import org.slf4j.Logger;
@@ -313,6 +314,17 @@ public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
     return outputCollector;
   }
 
+  /**
+   * On latencymark received.
+   *
+   * @param latencymark latencymark.
+   */
+  @Override
+  public void onLatencymark(final Latencymark latencymark) {
+    getOutputCollector().emitLatencymark(latencymark);
+  }
+
+
   @Override
   public final void close() {
     beforeClose();
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
index f5d59e3..3ef777d 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/CreateViewTransform.java
@@ -24,8 +24,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.KV;
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.LatencymarkEmitTransform;
 import org.apache.nemo.common.punctuation.Watermark;
 
 import javax.annotation.Nullable;
@@ -38,12 +37,11 @@ import java.util.*;
  * @param <I> input type
  * @param <O> materialized output type
  */
-public final class CreateViewTransform<I, O> implements Transform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
+public final class CreateViewTransform<I, O>
+  extends LatencymarkEmitTransform<WindowedValue<KV<?, I>>, WindowedValue<O>> {
   private final ViewFn<Materializations.MultimapView<Void, ?>, O> viewFn;
   private final Map<BoundedWindow, List<I>> windowListMap;
 
-  private OutputCollector<WindowedValue<O>> outputCollector;
-
   private long currentOutputWatermark;
 
   /**
@@ -58,11 +56,6 @@ public final class CreateViewTransform<I, O> implements Transform<WindowedValue<
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<WindowedValue<O>> oc) {
-    this.outputCollector = oc;
-  }
-
-  @Override
   public void onData(final WindowedValue<KV<?, I>> element) {
     // The key of element is always null (beam's semantic)
     // because view is a globally materialized data regardless of key
@@ -78,7 +71,7 @@ public final class CreateViewTransform<I, O> implements Transform<WindowedValue<
     // If no data, just forwards the watermark
     if (windowListMap.size() == 0 && currentOutputWatermark < inputWatermark.getTimestamp()) {
       currentOutputWatermark = inputWatermark.getTimestamp();
-      outputCollector.emitWatermark(inputWatermark);
+      getOutputCollector().emitWatermark(inputWatermark);
       return;
     }
 
@@ -90,7 +83,7 @@ public final class CreateViewTransform<I, O> implements Transform<WindowedValue<
       if (entry.getKey().maxTimestamp().getMillis() <= inputWatermark.getTimestamp()) {
         // emit the windowed data if the watermark timestamp > the window max boundary
         final O output = viewFn.apply(new MultiView<>(entry.getValue()));
-        outputCollector.emit(WindowedValue.of(
+        getOutputCollector().emit(WindowedValue.of(
           output, entry.getKey().maxTimestamp(), entry.getKey(), PaneInfo.ON_TIME_AND_ONLY_FIRING));
         iterator.remove();
 
@@ -103,7 +96,7 @@ public final class CreateViewTransform<I, O> implements Transform<WindowedValue<
       && currentOutputWatermark < minOutputTimestampOfEmittedWindows) {
       // update current output watermark and emit to next operators
       currentOutputWatermark = minOutputTimestampOfEmittedWindows;
-      outputCollector.emitWatermark(new Watermark(currentOutputWatermark));
+      getOutputCollector().emitWatermark(new Watermark(currentOutputWatermark));
     }
   }
 
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
index fed0315..61e6990 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/FlattenTransform.java
@@ -18,8 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.LatencymarkEmitTransform;
 import org.apache.nemo.common.punctuation.Watermark;
 
 /**
@@ -27,8 +26,7 @@ import org.apache.nemo.common.punctuation.Watermark;
  *
  * @param <T> input/output type.
  */
-public final class FlattenTransform<T> implements Transform<T, T> {
-  private OutputCollector<T> outputCollector;
+public final class FlattenTransform<T> extends LatencymarkEmitTransform<T, T> {
 
   /**
    * FlattenTransform Constructor.
@@ -38,18 +36,13 @@ public final class FlattenTransform<T> implements Transform<T, T> {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<T> oc) {
-    this.outputCollector = oc;
-  }
-
-  @Override
   public void onData(final T element) {
-    outputCollector.emit(element);
+    getOutputCollector().emit(element);
   }
 
   @Override
   public void onWatermark(final Watermark watermark) {
-    outputCollector.emitWatermark(watermark);
+    getOutputCollector().emitWatermark(watermark);
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
index 4aa366f..9194edb 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
@@ -299,6 +300,12 @@ public final class GBKTransform<K, InputT, OutputT>
       oc.emit(output);
     }
 
+    /** Emit latencymark. */
+    @Override
+    public final void emitLatencymark(final Latencymark latencymark) {
+      oc.emitLatencymark(latencymark);
+    }
+
     /** Emit watermark. */
     @Override
     public final void emitWatermark(final Watermark watermark) {
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java
index 4758066..4ff2aa6 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/SideInputTransform.java
@@ -19,8 +19,7 @@
 package org.apache.nemo.compiler.frontend.beam.transform;
 
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.LatencymarkEmitTransform;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.compiler.frontend.beam.SideInputElement;
 
@@ -30,8 +29,8 @@ import org.apache.nemo.compiler.frontend.beam.SideInputElement;
  *
  * @param <T> input/output type.
  */
-public final class SideInputTransform<T> implements Transform<WindowedValue<T>, WindowedValue<SideInputElement<T>>> {
-  private OutputCollector<WindowedValue<SideInputElement<T>>> outputCollector;
+public final class SideInputTransform<T>
+  extends LatencymarkEmitTransform<WindowedValue<T>, WindowedValue<SideInputElement<T>>> {
   private final int index;
 
   /**
@@ -44,18 +43,13 @@ public final class SideInputTransform<T> implements Transform<WindowedValue<T>,
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<WindowedValue<SideInputElement<T>>> oc) {
-    this.outputCollector = oc;
-  }
-
-  @Override
   public void onData(final WindowedValue<T> element) {
-    outputCollector.emit(element.withValue(new SideInputElement<>(index, element.getValue())));
+    getOutputCollector().emit(element.withValue(new SideInputElement<>(index, element.getValue())));
   }
 
   @Override
   public void onWatermark(final Watermark watermark) {
-    outputCollector.emitWatermark(watermark);
+    getOutputCollector().emitWatermark(watermark);
   }
 
   @Override
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
index 14ea5ae..ca93ad5 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/WindowFnTransform.java
@@ -24,8 +24,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.LatencymarkEmitTransform;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.joda.time.Instant;
 
@@ -40,10 +39,9 @@ import java.util.Collection;
  * @param <W> window type
  */
 public final class WindowFnTransform<T, W extends BoundedWindow>
-  implements Transform<WindowedValue<T>, WindowedValue<T>> {
+  extends LatencymarkEmitTransform<WindowedValue<T>, WindowedValue<T>> {
   private final WindowFn windowFn;
   private final DisplayData displayData;
-  private OutputCollector<WindowedValue<T>> outputCollector;
 
   /**
    * Default Constructor.
@@ -57,11 +55,6 @@ public final class WindowFnTransform<T, W extends BoundedWindow>
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<WindowedValue<T>> oc) {
-    this.outputCollector = oc;
-  }
-
-  @Override
   public void onData(final WindowedValue<T> windowedValue) {
     final BoundedWindow boundedWindow = Iterables.getOnlyElement(windowedValue.getWindows());
     final T element = windowedValue.getValue();
@@ -89,7 +82,7 @@ public final class WindowFnTransform<T, W extends BoundedWindow>
             });
 
       // Emit compressed windows for efficiency
-      outputCollector.emit(WindowedValue.of(element, timestamp, windows, PaneInfo.NO_FIRING));
+      getOutputCollector().emit(WindowedValue.of(element, timestamp, windows, PaneInfo.NO_FIRING));
     } catch (final Exception e) {
       throw new RuntimeException(e);
     }
@@ -97,7 +90,7 @@ public final class WindowFnTransform<T, W extends BoundedWindow>
 
   @Override
   public void onWatermark(final Watermark watermark) {
-    outputCollector.emitWatermark(watermark);
+    getOutputCollector().emitWatermark(watermark);
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
index 1ddb53e..d9ad4e5 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/FlatMapTransform.java
@@ -18,8 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.spark.transform;
 
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.LatencymarkEmitTransform;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.spark.api.java.function.FlatMapFunction;
 
@@ -29,9 +28,8 @@ import org.apache.spark.api.java.function.FlatMapFunction;
  * @param <T> input type.
  * @param <U> output type.
  */
-public final class FlatMapTransform<T, U> implements Transform<T, U> {
+public final class FlatMapTransform<T, U> extends LatencymarkEmitTransform<T, U> {
   private final FlatMapFunction<T, U> func;
-  private OutputCollector<U> outputCollector;
 
   /**
    * Constructor.
@@ -43,14 +41,9 @@ public final class FlatMapTransform<T, U> implements Transform<T, U> {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<U> oc) {
-    this.outputCollector = oc;
-  }
-
-  @Override
   public void onData(final T element) {
     try {
-      func.call(element).forEachRemaining(outputCollector::emit);
+      func.call(element).forEachRemaining(getOutputCollector()::emit);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -58,7 +51,7 @@ public final class FlatMapTransform<T, U> implements Transform<T, U> {
 
   @Override
   public void onWatermark(final Watermark watermark) {
-    outputCollector.emitWatermark(watermark);
+    getOutputCollector().emitWatermark(watermark);
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
index daa3f0a..8545205 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapToPairTransform.java
@@ -18,8 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.spark.transform;
 
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.LatencymarkEmitTransform;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.spark.api.java.function.PairFunction;
 import scala.Tuple2;
@@ -31,9 +30,8 @@ import scala.Tuple2;
  * @param <K> output key type.
  * @param <V> output value type.
  */
-public final class MapToPairTransform<T, K, V> implements Transform<T, Tuple2<K, V>> {
+public final class MapToPairTransform<T, K, V> extends LatencymarkEmitTransform<T, Tuple2<K, V>> {
   private final PairFunction<T, K, V> func;
-  private OutputCollector<Tuple2<K, V>> outputCollector;
 
   /**
    * Constructor.
@@ -45,15 +43,10 @@ public final class MapToPairTransform<T, K, V> implements Transform<T, Tuple2<K,
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<Tuple2<K, V>> oc) {
-    this.outputCollector = oc;
-  }
-
-  @Override
   public void onData(final T element) {
     try {
       Tuple2<K, V> data = func.call(element);
-      outputCollector.emit(data);
+      getOutputCollector().emit(data);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -61,7 +54,7 @@ public final class MapToPairTransform<T, K, V> implements Transform<T, Tuple2<K,
 
   @Override
   public void onWatermark(final Watermark watermark) {
-    outputCollector.emitWatermark(watermark);
+    getOutputCollector().emitWatermark(watermark);
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
index 6a98e71..3a15bdd 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/MapTransform.java
@@ -18,8 +18,7 @@
  */
 package org.apache.nemo.compiler.frontend.spark.transform;
 
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.LatencymarkEmitTransform;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.spark.api.java.function.Function;
 
@@ -29,9 +28,8 @@ import org.apache.spark.api.java.function.Function;
  * @param <I> input type.
  * @param <O> output type.
  */
-public final class MapTransform<I, O> implements Transform<I, O> {
+public final class MapTransform<I, O> extends LatencymarkEmitTransform<I, O> {
   private final Function<I, O> func;
-  private OutputCollector<O> outputCollector;
 
   /**
    * Constructor.
@@ -43,14 +41,9 @@ public final class MapTransform<I, O> implements Transform<I, O> {
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<O> oc) {
-    this.outputCollector = oc;
-  }
-
-  @Override
   public void onData(final I element) {
     try {
-      outputCollector.emit(func.call(element));
+      getOutputCollector().emit(func.call(element));
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
@@ -58,7 +51,7 @@ public final class MapTransform<I, O> implements Transform<I, O> {
 
   @Override
   public void onWatermark(final Watermark watermark) {
-    outputCollector.emitWatermark(watermark);
+    getOutputCollector().emitWatermark(watermark);
   }
 
   @Override
diff --git a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
index 3c5e2d7..a114530 100644
--- a/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
+++ b/compiler/frontend/spark/src/main/java/org/apache/nemo/compiler/frontend/spark/transform/ReduceTransform.java
@@ -19,7 +19,7 @@
 package org.apache.nemo.compiler.frontend.spark.transform;
 
 import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.ir.vertex.transform.LatencymarkEmitTransform;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.spark.api.java.function.Function2;
 
@@ -31,9 +31,8 @@ import java.util.Iterator;
  *
  * @param <T> element type.
  */
-public final class ReduceTransform<T> implements Transform<T, T> {
+public final class ReduceTransform<T> extends LatencymarkEmitTransform<T, T> {
   private final Function2<T, T, T> func;
-  private OutputCollector<T> outputCollector;
   // TODO #431: Handle states in Transforms better
   private T result;
 
@@ -49,7 +48,7 @@ public final class ReduceTransform<T> implements Transform<T, T> {
 
   @Override
   public void prepare(final Context context, final OutputCollector<T> oc) {
-    this.outputCollector = oc;
+    super.prepare(context, oc);
     this.result = null;
   }
 
@@ -69,12 +68,12 @@ public final class ReduceTransform<T> implements Transform<T, T> {
       throw new RuntimeException(e);
     }
 
-    outputCollector.emit(result);
+    getOutputCollector().emit(result);
   }
 
   @Override
   public void onWatermark(final Watermark watermark) {
-    outputCollector.emitWatermark(watermark);
+    getOutputCollector().emitWatermark(watermark);
   }
 
   /**
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
index c9c9fb0..914ccf5 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/TestOutputCollector.java
@@ -20,6 +20,7 @@ package org.apache.nemo.compiler.frontend.beam.transform;
 
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.reef.io.Tuple;
 
@@ -35,11 +36,13 @@ final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>>
   public final List<WindowedValue<T>> outputs;
   public final List<Tuple<String, WindowedValue<T>>> taggedOutputs;
   public final List<Watermark> watermarks;
+  public final List<Latencymark> latencymarks;
 
   TestOutputCollector() {
     this.outputs = new LinkedList<>();
     this.taggedOutputs = new LinkedList<>();
     this.watermarks = new LinkedList<>();
+    this.latencymarks = new LinkedList<>();
   }
 
   @Override
@@ -52,6 +55,10 @@ final class TestOutputCollector<T> implements OutputCollector<WindowedValue<T>>
     watermarks.add(watermark);
   }
 
+  @Override
+  public void emitLatencymark(Latencymark latencymark) {
+    latencymarks.add(latencymark);
+  }
 
   @Override
   public <O> void emit(String dstVertexId, O output) {
diff --git a/conf/src/main/java/org/apache/nemo/conf/JobConf.java b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
index b7ce273..c88e560 100644
--- a/conf/src/main/java/org/apache/nemo/conf/JobConf.java
+++ b/conf/src/main/java/org/apache/nemo/conf/JobConf.java
@@ -250,6 +250,25 @@ public final class JobConf extends ConfigurationModuleBuilder {
   public final class ExecutorJSONContents implements Name<String> {
   }
 
+  ///////////////////////// Metric Configurations
+  /**
+   * Period how often stream metrics are recorded. the unit of period is millisecond.
+   * -1 indicates that metrics are not recorded periodically.
+   */
+  @NamedParameter(doc = "Period how often stream-related metrics are recorded. The unit of period is millisecond.",
+    short_name = "stream_metric_period", default_value = "-1")
+  public final class StreamMetricPeriod implements Name<Integer> {
+  }
+
+  /**
+   * Period how often latencymarks are sent from source vertex. the unit of period is millisecond.
+   * -1 indicates that latencymarks are not sent.
+   */
+  @NamedParameter(doc = "Period how often latencymarks are sent from source vertex. The unit of period is millisecond.",
+    short_name = "latencymark_period", default_value = "-1")
+  public final class LatencyMarkPeriod implements Name<Integer> {
+  }
+
   //////////////////////////////// Runtime Data Plane Configurations
 
   /**
@@ -336,6 +355,8 @@ public final class JobConf extends ConfigurationModuleBuilder {
 
   public static final RequiredParameter<String> EXECUTOR_ID = new RequiredParameter<>();
   public static final RequiredParameter<String> JOB_ID = new RequiredParameter<>();
+  public static final OptionalParameter<Integer> STREAM_METRIC_PERIOD = new OptionalParameter<>();
+  public static final OptionalParameter<Integer> LATENCYMARK_PERIOD = new OptionalParameter<>();
   public static final OptionalParameter<String> LOCAL_DISK_DIRECTORY = new OptionalParameter<>();
   public static final OptionalParameter<String> GLUSTER_DISK_DIRECTORY = new OptionalParameter<>();
 
@@ -344,5 +365,7 @@ public final class JobConf extends ConfigurationModuleBuilder {
     .bindNamedParameter(JobId.class, JOB_ID)
     .bindNamedParameter(FileDirectory.class, LOCAL_DISK_DIRECTORY)
     .bindNamedParameter(GlusterVolumeDirectory.class, GLUSTER_DISK_DIRECTORY)
+    .bindNamedParameter(StreamMetricPeriod.class, STREAM_METRIC_PERIOD)
+    .bindNamedParameter(LatencyMarkPeriod.class, LATENCYMARK_PERIOD)
     .build();
 }
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
new file mode 100644
index 0000000..a5fb93e
--- /dev/null
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/LatencyMetric.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import org.apache.nemo.common.punctuation.Latencymark;
+
+import java.io.Serializable;
+
+/**
+ * Metric class for recording latencymark and the time when the latencymark is recorded.
+ * The traversal time can be calculated by comparing the time when the latencymark was created with the time recorded.
+ */
+public final class LatencyMetric implements Serializable {
+  private final Latencymark latencymark;
+  private final long timestamp;
+
+  /**
+   * Constructor with the latencymark and timestamp.
+   *
+   * @param latencymark the latencymark to record.
+   * @param timestamp When the latencymark was received.
+   */
+  public LatencyMetric(final Latencymark latencymark, final long timestamp) {
+    this.latencymark = latencymark;
+    this.timestamp = timestamp;
+  }
+
+  /**
+   * Get the recorded latency mark.
+   *
+   * @return latency mark.
+   */
+  public Latencymark getLatencymark() {
+    return latencymark;
+  }
+
+  /**
+   * Get the timestamp when the latencymark is received.
+   *
+   * @return timestamp when it is received.
+   */
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+}
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/StreamMetric.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/StreamMetric.java
new file mode 100644
index 0000000..563c5fb
--- /dev/null
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/StreamMetric.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nemo.runtime.common.metric;
+
+import java.io.Serializable;
+
+/**
+ * Metrics associated with stream. It is periodically recorded.
+ */
+public final class StreamMetric implements Serializable {
+  private final long startTimeStamp;
+  private final long endTimeStamp;
+  private final long numOfReadTuples;
+  private final boolean isReadNotSerializedData;
+  private final long serializedReadBytes;
+
+
+  /**
+   * @param startTimeStamp the starting point from which metric is recorded.
+   * @param endTimeStamp the endpoint from which metric is recorded.
+   * @param numOfTuples the number of tuples processed between starting point and endpoint.
+   * @param serializedReadBytes the number of read bytes processed between starting point and endpoint.
+   * @param isReadNotSerializedData whether the task read data from local block or not.
+   */
+  public StreamMetric(final long startTimeStamp,
+                      final long endTimeStamp,
+                      final long numOfTuples,
+                      final long serializedReadBytes,
+                      final boolean isReadNotSerializedData) {
+    this.startTimeStamp = startTimeStamp;
+    this.endTimeStamp = endTimeStamp;
+    this.numOfReadTuples = numOfTuples;
+    this.isReadNotSerializedData = isReadNotSerializedData;
+    this.serializedReadBytes = serializedReadBytes;
+  }
+
+  /**
+   * Get starting point of record period.
+   *
+   * @return start timestamp.
+   */
+  public long getStartTimeStamp() {
+    return startTimeStamp;
+  }
+
+  /**
+   * Get endpoint of record period.
+   *
+   * @return end timestamp.
+   */
+  public long getEndTimeStamp() {
+    return endTimeStamp;
+  }
+
+  /**
+   * Get the number of processed tuple.
+   *
+   * @return number of tuples.
+   */
+  public long getNumOfProcessedTuples() {
+    return numOfReadTuples;
+  }
+
+  /**
+   * Get the number of read bytes.
+   *
+   * @return number of read bytes.
+   */
+  public long getSerializedReadBytes() {
+    return serializedReadBytes;
+  }
+
+  /**
+   * Get a boolean value that indicates whether it read data from local block or not.
+   *
+   * @return a boolean value that indicates whether it read data form local block or not.
+   */
+  public boolean getIsReadNotSerializedData() {
+    return isReadNotSerializedData;
+  }
+}
diff --git a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
index 531e715..5521530 100644
--- a/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
+++ b/runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java
@@ -23,8 +23,7 @@ import org.apache.nemo.runtime.common.state.TaskState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
 
 /**
  * Metric class for {@link org.apache.nemo.runtime.common.plan.Task}.
@@ -34,6 +33,17 @@ public class TaskMetric implements StateMetric<TaskState.State> {
   private String containerId = "";
   private int scheduleAttempt = -1;
   private List<StateTransitionEvent<TaskState.State>> stateTransitionEvents = new ArrayList<>();
+
+  /**
+   * Source Vertex Id of data fetcher to List of stream metrics.
+   */
+  private final Map<String, List<StreamMetric>> streamMetrics = new HashMap<>();
+
+  /**
+   * Source Vertex Id of data fetcher to List of latency metrics.
+   */
+  private final Map<String, List<LatencyMetric>> latencymarks = new HashMap<>();
+
   private long taskDuration = -1;
   private long taskCPUTime = -1;
   private long schedulingOverhead = -1;
@@ -107,6 +117,28 @@ public class TaskMetric implements StateMetric<TaskState.State> {
   }
 
   /**
+   * Method related to stream metric.
+   *
+   * @return the streamMetrics
+   */
+  public final Map<String, List<StreamMetric>> getStreamMetric() {
+    return streamMetrics;
+  }
+
+  private void setStreamMetric(final Map<String, StreamMetric> streamMetricMap) {
+    for (String sourceVertexId : streamMetricMap.keySet()) {
+      StreamMetric streamMetric = streamMetricMap.get(sourceVertexId);
+      streamMetrics.putIfAbsent(sourceVertexId, new LinkedList<>());
+      streamMetrics.get(sourceVertexId).add(streamMetric);
+    }
+  }
+
+  private void addLatencymark(final LatencyMetric latencyMetric) {
+    latencymarks.putIfAbsent(latencyMetric.getLatencymark().getPreviousTaskId(), new LinkedList<>());
+    latencymarks.get(latencyMetric.getLatencymark().getPreviousTaskId()).add(latencyMetric);
+  }
+
+  /**
    * Method related to task CPU time.
    */
   public final long getTaskCPUTime() {
@@ -261,6 +293,12 @@ public class TaskMetric implements StateMetric<TaskState.State> {
   public final boolean processMetricMessage(final String metricField, final byte[] metricValue) {
     LOG.debug("metric {} has just arrived!", metricField);
     switch (metricField) {
+      case "streamMetric":
+        setStreamMetric(SerializationUtils.deserialize(metricValue));
+        break;
+      case "latencymark":
+        addLatencymark(SerializationUtils.deserialize(metricValue));
+        break;
       case "taskDuration":
         setTaskDuration(SerializationUtils.deserialize(metricValue));
         break;
diff --git a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
index 3b43abd..b22ee05 100644
--- a/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
+++ b/runtime/driver/src/main/java/org/apache/nemo/driver/NemoDriver.java
@@ -78,6 +78,8 @@ public final class NemoDriver {
   private final UserApplicationRunner userApplicationRunner;
   private final RuntimeMaster runtimeMaster;
   private final String jobId;
+  private final Integer streamMetricPeriod;
+  private final Integer latencyMarkPeriod;
   private final String localDirectory;
   private final String glusterDirectory;
   private final ClientRPC clientRPC;
@@ -100,6 +102,8 @@ public final class NemoDriver {
                      @Parameter(JobConf.ExecutorJSONContents.class) final String resourceSpecificationString,
                      @Parameter(JobConf.BandwidthJSONContents.class) final String bandwidthString,
                      @Parameter(JobConf.JobId.class) final String jobId,
+                     @Parameter(JobConf.StreamMetricPeriod.class) final int streamMetricPeriod,
+                     @Parameter(JobConf.LatencyMarkPeriod.class) final int latencyMarkPeriod,
                      @Parameter(JobConf.FileDirectory.class) final String localDirectory,
                      @Parameter(JobConf.GlusterVolumeDirectory.class) final String glusterDirectory) {
     IdManager.setInDriver();
@@ -109,6 +113,8 @@ public final class NemoDriver {
     this.localAddressProvider = localAddressProvider;
     this.resourceSpecificationString = resourceSpecificationString;
     this.jobId = jobId;
+    this.streamMetricPeriod = streamMetricPeriod;
+    this.latencyMarkPeriod = latencyMarkPeriod;
     this.localDirectory = localDirectory;
     this.glusterDirectory = glusterDirectory;
     this.handler = new RemoteClientMessageLoggingHandler(client);
@@ -250,6 +256,8 @@ public final class NemoDriver {
   private Configuration getExecutorConfiguration(final String executorId) {
     final Configuration executorConfiguration = JobConf.EXECUTOR_CONF
       .set(JobConf.EXECUTOR_ID, executorId)
+      .set(JobConf.STREAM_METRIC_PERIOD, streamMetricPeriod)
+      .set(JobConf.LATENCYMARK_PERIOD, latencyMarkPeriod)
       .set(JobConf.GLUSTER_DISK_DIRECTORY, glusterDirectory)
       .set(JobConf.LOCAL_DISK_DIRECTORY, localDirectory)
       .set(JobConf.JOB_ID, jobId)
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
index 75c2e43..000318b 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java
@@ -53,8 +53,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
 
 /**
  * Executor.
@@ -63,11 +64,13 @@ public final class Executor {
   private static final Logger LOG = LoggerFactory.getLogger(Executor.class.getName());
 
   private final String executorId;
+  private final int latencyMarkSendPeriod;
 
   /**
    * To be used for a thread pool to execute tasks.
    */
   private final ExecutorService executorService;
+  private ScheduledExecutorService periodicMetricService = null;
 
   /**
    * In charge of this executor's intermediate data transfer.
@@ -85,8 +88,12 @@ public final class Executor {
 
   private final MetricMessageSender metricMessageSender;
 
+  private final List<TaskExecutor> taskExecutorList = new ArrayList<>();
+
   @Inject
   private Executor(@Parameter(JobConf.ExecutorId.class) final String executorId,
+                   @Parameter(JobConf.StreamMetricPeriod.class) final int streamMetricRecordPeriod,
+                   @Parameter(JobConf.LatencyMarkPeriod.class) final int latencyMarkSendPeriod,
                    final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
                    final MessageEnvironment messageEnvironment,
                    final SerializerManager serializerManager,
@@ -94,6 +101,7 @@ public final class Executor {
                    final BroadcastManagerWorker broadcastManagerWorker,
                    final MetricManagerWorker metricMessageSender) {
     this.executorId = executorId;
+    this.latencyMarkSendPeriod = latencyMarkSendPeriod;
     this.executorService = Executors.newCachedThreadPool(new BasicThreadFactory.Builder()
       .namingPattern("TaskExecutor thread-%d")
       .build());
@@ -103,6 +111,17 @@ public final class Executor {
     this.broadcastManagerWorker = broadcastManagerWorker;
     this.metricMessageSender = metricMessageSender;
     messageEnvironment.setupListener(MessageEnvironment.EXECUTOR_MESSAGE_LISTENER_ID, new ExecutorMessageReceiver());
+
+    // set the interval for recording stream metric
+    if (streamMetricRecordPeriod > 0) {
+      this.periodicMetricService = Executors.newScheduledThreadPool(1);
+      this.periodicMetricService.scheduleAtFixedRate(
+        this::sendStreamMetric, 0, streamMetricRecordPeriod, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  private void sendStreamMetric() {
+    taskExecutorList.forEach(TaskExecutor::sendStreamMetric);
   }
 
   public String getExecutorId() {
@@ -148,8 +167,10 @@ public final class Executor {
           e.getPropertyValue(CompressionProperty.class).orElse(null),
           e.getPropertyValue(DecompressionProperty.class).orElse(null))));
 
-      new TaskExecutor(task, irDag, taskStateManager, intermediateDataIOFactory, broadcastManagerWorker,
-        metricMessageSender, persistentConnectionToMasterMap).execute();
+      final TaskExecutor executor = new TaskExecutor(task, irDag, taskStateManager, intermediateDataIOFactory,
+        broadcastManagerWorker, metricMessageSender, persistentConnectionToMasterMap, latencyMarkSendPeriod);
+      taskExecutorList.add(executor);
+      executor.execute();
     } catch (final Exception e) {
       persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
         ControlMessage.Message.newBuilder()
@@ -199,6 +220,9 @@ public final class Executor {
 
   public void terminate() {
     try {
+      if (periodicMetricService != null) {
+        periodicMetricService.shutdown();
+      }
       metricMessageSender.close();
     } catch (final UnknownFailureCauseException e) {
       throw new UnknownFailureCauseException(
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
index a0e7fcf..7ba581f 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/data/DataUtil.java
@@ -106,8 +106,8 @@ public final class DataUtil {
    * @param <K>                 the key type of the partitions.
    * @param memoryPoolAssigner  the memory pool assigner for DirectByteBufferOutputStream.
    * @return the converted {@link SerializedPartition}s.
-   * @throws IOException if fail to convert.
-   * @throws MemoryAllocationException  if fail to allocate memory.
+   * @throws IOException               if fail to convert.
+   * @throws MemoryAllocationException if fail to allocate memory.
    */
   public static <K extends Serializable> Iterable<SerializedPartition<K>> convertToSerPartitions(
     final Serializer serializer,
@@ -218,6 +218,7 @@ public final class DataUtil {
 
     private CountingInputStream serializedCountingStream = null;
     private CountingInputStream encodedCountingStream = null;
+    private boolean isLocal = false;
     private boolean hasNext = false;
     private T next;
     private boolean cannotContinueDecoding = false;
@@ -304,6 +305,27 @@ public final class DataUtil {
       }
       return numEncodedBytes;
     }
+
+    @Override
+    public long getCurrNumSerializedBytes() {
+      if (serializedCountingStream == null) {
+        return numSerializedBytes;
+      }
+      return numSerializedBytes + serializedCountingStream.getCount();
+    }
+
+    @Override
+    public long getCurrNumEncodedBytes() {
+      if (encodedCountingStream == null) {
+        return numEncodedBytes;
+      }
+      return numEncodedBytes + encodedCountingStream.getCount();
+    }
+
+    @Override
+    public boolean isReadNotSerializedData() {
+      return false;
+    }
   }
 
   /**
@@ -365,11 +387,21 @@ public final class DataUtil {
         }
 
         @Override
+        public long getCurrNumSerializedBytes() throws NumBytesNotSupportedException {
+          throw new NumBytesNotSupportedException();
+        }
+
+        @Override
         public long getNumEncodedBytes() throws NumBytesNotSupportedException {
           throw new NumBytesNotSupportedException();
         }
 
         @Override
+        public long getCurrNumEncodedBytes() throws NumBytesNotSupportedException {
+          throw new NumBytesNotSupportedException();
+        }
+
+        @Override
         public boolean hasNext() {
           return innerIterator.hasNext();
         }
@@ -378,6 +410,11 @@ public final class DataUtil {
         public E next() {
           return innerIterator.next();
         }
+
+        @Override
+        public boolean isReadNotSerializedData() {
+          return true;
+        }
       };
     }
 
@@ -400,11 +437,21 @@ public final class DataUtil {
         }
 
         @Override
+        public long getCurrNumSerializedBytes() {
+          return numSerializedBytes;
+        }
+
+        @Override
         public long getNumEncodedBytes() {
           return numEncodedBytes;
         }
 
         @Override
+        public long getCurrNumEncodedBytes() {
+          return numEncodedBytes;
+        }
+
+        @Override
         public boolean hasNext() {
           return innerIterator.hasNext();
         }
@@ -413,6 +460,11 @@ public final class DataUtil {
         public E next() {
           return innerIterator.next();
         }
+
+        @Override
+        public boolean isReadNotSerializedData() {
+          return false;
+        }
       };
     }
 
@@ -439,6 +491,17 @@ public final class DataUtil {
     long getNumSerializedBytes() throws NumBytesNotSupportedException;
 
     /**
+     * This method can be called before the every actual data completely taken from iterator,
+     * When the every actual data completely taken from iterator,
+     * its return value must be same with a return value of getNumSerializedBytes().
+     *
+     * @return the number of currently read bytes in serialized form (which is, for example, encoded and compressed)
+     * @throws NumBytesNotSupportedException when the operation is not supported
+     * @throws IllegalStateException         when the information is not ready
+     */
+    long getCurrNumSerializedBytes() throws NumBytesNotSupportedException;
+
+    /**
      * This method should be called after the actual data is taken out of iterator,
      * since the existence of an iterator does not guarantee that data inside it is ready.
      *
@@ -447,5 +510,18 @@ public final class DataUtil {
      * @throws IllegalStateException         when the information is not ready
      */
     long getNumEncodedBytes() throws NumBytesNotSupportedException;
+
+    /**
+     * This method can be called before the every actual data completely taken from iterator,
+     * When the every actual data completely taken from iterator,
+     * its return value must be same with a return value of getNumSerializedBytes().
+     *
+     * @return the number of bytes in encoded form (which is ready to be decoded)
+     * @throws NumBytesNotSupportedException when the operation is not supported
+     * @throws IllegalStateException         when the information is not ready
+     */
+    long getCurrNumEncodedBytes() throws NumBytesNotSupportedException;
+
+    boolean isReadNotSerializedData();
   }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
index 97cde03..4bc0e5a 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/BlockOutputWriter.java
@@ -23,6 +23,7 @@ import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.partitioner.DedicatedKeyPerElement;
 import org.apache.nemo.common.partitioner.Partitioner;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
@@ -99,6 +100,11 @@ public final class BlockOutputWriter implements OutputWriter {
     // do nothing
   }
 
+  @Override
+  public void writeLatencymark(final Latencymark latencymark) {
+    // do nothing
+  }
+
   /**
    * Notifies that all writes for a block is end.
    * Further write about a committed block will throw an exception.
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
index 2012dee..42cf624 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/DataFetcherOutputCollector.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +62,11 @@ public final class DataFetcherOutputCollector<O> implements OutputCollector<O> {
   }
 
   @Override
+  public void emitLatencymark(final Latencymark latencymark) {
+    nextOperatorVertex.getTransform().onLatencymark(latencymark);
+  }
+
+  @Override
   public <T> void emit(final String dstVertexId, final T output) {
     throw new RuntimeException("No additional output tag in DataFetcherOutputCollector");
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
index 2de9fdf..ad26a14 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventDecoderFactory.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.nemo.common.coder.DecoderFactory;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -70,22 +71,25 @@ public final class NemoEventDecoderFactory implements DecoderFactory {
     @Override
     public Object decode() throws IOException {
 
-      final byte isWatermark = (byte) inputStream.read();
-      if (isWatermark == -1) {
+      final byte dataType = (byte) inputStream.read();
+      if (dataType == -1) {
         // end of the input stream
         throw new EOFException();
       }
 
-      if (isWatermark == 0x00) {
+      if (dataType == 0x00) {
         // this is not a watermark
         return valueDecoder.decode();
-      } else if (isWatermark == 0x01) {
+      } else if (dataType == 0x01) {
         // this is a watermark
         final WatermarkWithIndex watermarkWithIndex =
           (WatermarkWithIndex) SerializationUtils.deserialize(inputStream);
         return watermarkWithIndex;
+      } else if (dataType == 0x02) {
+        final Latencymark latencymark = (Latencymark) SerializationUtils.deserialize(inputStream);
+        return latencymark;
       } else {
-        throw new RuntimeException("Watermark decoding failure: " + isWatermark);
+        throw new RuntimeException("Element decoding failure: " + dataType);
       }
     }
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
index cb07ffd..5f96360 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/NemoEventEncoderFactory.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.commons.lang.SerializationUtils;
 import org.apache.nemo.common.coder.EncoderFactory;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +72,9 @@ public final class NemoEventEncoderFactory implements EncoderFactory {
       if (element instanceof WatermarkWithIndex) {
         outputStream.write(0x01); // this is watermark
         outputStream.write(SerializationUtils.serialize((Serializable) element));
+      } else if (element instanceof Latencymark) {
+        outputStream.write(0x02);
+        outputStream.write(SerializationUtils.serialize((Serializable) element));
       } else {
         outputStream.write(0x00); // this is a data element
         valueEncoder.encode(element);
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
index ca5a2bc..319e079 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java
@@ -21,6 +21,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -131,4 +132,33 @@ public final class OperatorVertexOutputCollector<O> implements OutputCollector<O
       }
     }
   }
+
+  @Override
+  public void emitLatencymark(final Latencymark latencymark) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("{} emits latencymark {}", irVertex.getId(), latencymark);
+    }
+
+    // Emit latencymark to internal vertices
+    for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
+      internalVertex.getNextOperator().getTransform().onLatencymark(latencymark);
+    }
+
+    for (final List<NextIntraTaskOperatorInfo> internalVertices : internalAdditionalOutputs.values()) {
+      for (final NextIntraTaskOperatorInfo internalVertex : internalVertices) {
+        internalVertex.getNextOperator().getTransform().onLatencymark(latencymark);
+      }
+    }
+
+    // Emit latencymark to output writer
+    for (final OutputWriter outputWriter : externalMainOutputs) {
+      outputWriter.writeLatencymark(latencymark);
+    }
+
+    for (final List<OutputWriter> externalVertices : externalAdditionalOutputs.values()) {
+      for (final OutputWriter externalVertex : externalVertices) {
+        externalVertex.writeLatencymark(latencymark);
+      }
+    }
+  }
 }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
index 319a30f..082d048 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorWatermarkCollector.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.OperatorVertex;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +49,11 @@ public final class OperatorWatermarkCollector implements OutputCollector {
   }
 
   @Override
+  public void emitLatencymark(final Latencymark latencymakr) {
+    throw new IllegalStateException("Should not be called");
+  }
+
+  @Override
   public void emit(final String dstVertexId, final Object output) {
     throw new IllegalStateException("Should not be called");
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
index bf6ff84..586a048 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OutputWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.datatransfer;
 
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 
 import java.util.Optional;
@@ -41,6 +42,15 @@ public interface OutputWriter {
   void writeWatermark(Watermark watermark);
 
   /**
+   * Writes latencymark to all edges.
+   * It does not consider buffered time in windows.
+   * transfer to the next task immediately.
+   *
+   * @param latencymark latencymark
+   */
+  void writeLatencymark(Latencymark latencymark);
+
+  /**
    * @return the total written bytes.
    */
   Optional<Long> getWrittenBytes();
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
index 544d64d..c6c82f0 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/PipeOutputWriter.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.ir.edge.executionproperty.CommunicationPatternProperty;
 import org.apache.nemo.common.partitioner.Partitioner;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
@@ -109,6 +110,15 @@ public final class PipeOutputWriter implements OutputWriter {
   }
 
   @Override
+  public void writeLatencymark(final Latencymark latencymark) {
+    if (!initialized) {
+      doInitialize();
+    }
+
+    writeData(latencymark, pipes);
+  }
+
+  @Override
   public Optional<Long> getWrittenBytes() {
     return Optional.empty();
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java
index b353e38..7bd9af4 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/RunTimeMessageOutputCollector.java
@@ -20,6 +20,7 @@ package org.apache.nemo.runtime.executor.datatransfer;
 
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
@@ -103,6 +104,11 @@ public final class RunTimeMessageOutputCollector<O> implements OutputCollector<O
   }
 
   @Override
+  public void emitLatencymark(final Latencymark latencymark) {
+    // do nothing
+  }
+
+  @Override
   public <T> void emit(final String dstVertexId, final T output) {
     throw new IllegalStateException("Dynamic optimization does not emit tagged data");
   }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
index 797818c..9b39359 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/MultiThreadParentTaskDataFetcher.java
@@ -18,9 +18,11 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.vertex.IRVertex;
 import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.executor.data.DataUtil;
 import org.apache.nemo.runtime.executor.datatransfer.*;
@@ -56,6 +58,7 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
   private boolean firstFetch = true;
 
   private final ConcurrentLinkedQueue elementQueue;
+  private final ConcurrentLinkedQueue<DataUtil.IteratorWithNumBytes> iterators;
 
   private long serBytes = 0;
   private long encodedBytes = 0;
@@ -74,6 +77,7 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
     this.readersForParentTask = readerForParentTask;
     this.firstFetch = true;
     this.elementQueue = new ConcurrentLinkedQueue();
+    this.iterators = new ConcurrentLinkedQueue<>();
     this.queueInsertionThreads = Executors.newCachedThreadPool();
   }
 
@@ -114,6 +118,8 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
       // A thread for each iterator
       queueInsertionThreads.submit(() -> {
         if (exception == null) {
+          iterators.add(iterator);
+
           // Consume this iterator to the end.
           while (iterator.hasNext()) { // blocked on the iterator.
             final Object element = iterator.next();
@@ -166,6 +172,26 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
     }
   }
 
+  public Pair<Boolean, Long> getCurrSerBytes() {
+    try {
+      long currSerBytes = 0;
+      boolean isReadNotSerializedData = false;
+      for (DataUtil.IteratorWithNumBytes iterator : iterators) {
+        if (!iterator.isReadNotSerializedData()) {
+          currSerBytes += iterator.getCurrNumSerializedBytes();
+        } else {
+          isReadNotSerializedData = true;
+        }
+      }
+      return Pair.of(isReadNotSerializedData, currSerBytes);
+    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+      return Pair.of(false, -1L);
+    } catch (final IllegalStateException e) {
+      LOG.error("Failed to get the number of bytes of currently serialized data", e);
+      return Pair.of(false, -1L);
+    }
+  }
+
   @Override
   public void close() throws Exception {
     queueInsertionThreads.shutdown();
@@ -187,6 +213,11 @@ class MultiThreadParentTaskDataFetcher extends DataFetcher {
     }
 
     @Override
+    public void emitLatencymark(final Latencymark latencymark) {
+      throw new IllegalStateException("Should not be called");
+    }
+
+    @Override
     public void emit(final String dstVertexId, final Object output) {
       throw new IllegalStateException("Should not be called");
     }
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
index a8ae4a9..e5bcb29 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/ParentTaskDataFetcher.java
@@ -18,6 +18,7 @@
  */
 package org.apache.nemo.runtime.executor.task;
 
+import org.apache.nemo.common.Pair;
 import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.edge.executionproperty.BlockFetchFailureProperty;
 import org.apache.nemo.common.ir.vertex.IRVertex;
@@ -45,6 +46,7 @@ class ParentTaskDataFetcher extends DataFetcher {
 
   // Non-finals (lazy fetching)
   private boolean firstFetch;
+  private boolean readNotSerialized;
   private int expectedNumOfIterators;
   private DataUtil.IteratorWithNumBytes currentIterator;
   private int currentIteratorIndex;
@@ -57,6 +59,7 @@ class ParentTaskDataFetcher extends DataFetcher {
     super(dataSource, outputCollector);
     this.inputReader = inputReader;
     this.firstFetch = true;
+    this.readNotSerialized = false;
     this.currentIteratorIndex = 0;
     this.iteratorQueue = new LinkedBlockingQueue<>();
   }
@@ -191,6 +194,22 @@ class ParentTaskDataFetcher extends DataFetcher {
     }
   }
 
+  public Pair<Boolean, Long> getCurrSerBytes() {
+    try {
+      if (currentIterator == null) {
+        return Pair.of(false, -1L);
+      } else if (currentIterator.isReadNotSerializedData()) {
+        return Pair.of(true, 0L);
+      }
+      return Pair.of(currentIterator.isReadNotSerializedData(), serBytes + currentIterator.getCurrNumSerializedBytes());
+    } catch (final DataUtil.IteratorWithNumBytes.NumBytesNotSupportedException e) {
+      return Pair.of(false, -1L);
+    } catch (final IllegalStateException e) {
+      LOG.error("Failed to get the number of bytes of currently serialized data", e);
+      return Pair.of(false, -1L);
+    }
+  }
+
   @Override
   public void close() throws Exception {
 
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
index 2d82898..d42099e 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/SourceVertexDataFetcher.java
@@ -22,6 +22,7 @@ import org.apache.nemo.common.ir.OutputCollector;
 import org.apache.nemo.common.ir.Readable;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 
 import java.util.concurrent.Executors;
@@ -33,27 +34,38 @@ import java.util.concurrent.TimeUnit;
  */
 class SourceVertexDataFetcher extends DataFetcher {
   private final Readable readable;
+  private final String taskId;
   private long boundedSourceReadTime = 0;
   private static final long WATERMARK_PERIOD = 1000; // ms
-  private final ScheduledExecutorService watermarkTriggerService;
+  private static final long LATENCYMARK_PERIOD = 1000; // ms
+  private final ScheduledExecutorService streamMarkTriggerService;
   private boolean watermarkTriggered = false;
+  private boolean latencyMarkTriggered = false;
   private final boolean bounded;
 
   SourceVertexDataFetcher(final SourceVertex dataSource,
                           final Readable readable,
-                          final OutputCollector outputCollector) {
+                          final OutputCollector outputCollector,
+                          final long latencyMarkSendPeriod,
+                          final String taskId) {
     super(dataSource, outputCollector);
+    this.taskId = taskId;
     this.readable = readable;
     this.readable.prepare();
     this.bounded = dataSource.isBounded();
 
     if (!bounded) {
-      this.watermarkTriggerService = Executors.newScheduledThreadPool(1);
-      this.watermarkTriggerService.scheduleAtFixedRate(() ->
+      this.streamMarkTriggerService = Executors.newScheduledThreadPool(1);
+      this.streamMarkTriggerService.scheduleAtFixedRate(() ->
         watermarkTriggered = true,
         WATERMARK_PERIOD, WATERMARK_PERIOD, TimeUnit.MILLISECONDS);
+      if (latencyMarkSendPeriod != -1) {
+        this.streamMarkTriggerService.scheduleAtFixedRate(() ->
+            latencyMarkTriggered = true,
+          latencyMarkSendPeriod, latencyMarkSendPeriod, TimeUnit.MILLISECONDS);
+      }
     } else {
-      this.watermarkTriggerService = null;
+      this.streamMarkTriggerService = null;
     }
   }
 
@@ -81,8 +93,8 @@ class SourceVertexDataFetcher extends DataFetcher {
   @Override
   public void close() throws Exception {
     readable.close();
-    if (watermarkTriggerService != null) {
-      watermarkTriggerService.shutdown();
+    if (streamMarkTriggerService != null) {
+      streamMarkTriggerService.shutdown();
     }
   }
 
@@ -95,10 +107,23 @@ class SourceVertexDataFetcher extends DataFetcher {
     }
   }
 
+  private boolean isLatencyMarkTriggered() {
+    if (latencyMarkTriggered) {
+      latencyMarkTriggered = false;
+      return true;
+    } else {
+      return false;
+    }
+  }
+
   private Object retrieveElement() {
     // Emit watermark
-    if (!bounded && isWatermarkTriggerTime()) {
-      return new Watermark(readable.readWatermark());
+    if (!bounded) {
+      if (isWatermarkTriggerTime()) {
+        return new Watermark(readable.readWatermark());
+      } else if (isLatencyMarkTriggered()) {
+        return new Latencymark(taskId, System.currentTimeMillis());
+      }
     }
 
     // Data
diff --git a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
index 2bf574d..7b8f99b 100644
--- a/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
+++ b/runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java
@@ -34,11 +34,14 @@ import org.apache.nemo.common.ir.vertex.transform.MessageAggregatorTransform;
 import org.apache.nemo.common.ir.vertex.transform.SignalTransform;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
 import org.apache.nemo.common.punctuation.Finishmark;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.comm.ControlMessage;
 import org.apache.nemo.runtime.common.message.MessageEnvironment;
 import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
+import org.apache.nemo.runtime.common.metric.LatencyMetric;
+import org.apache.nemo.runtime.common.metric.StreamMetric;
 import org.apache.nemo.runtime.common.plan.RuntimeEdge;
 import org.apache.nemo.runtime.common.plan.StageEdge;
 import org.apache.nemo.runtime.common.plan.Task;
@@ -53,7 +56,9 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 /**
@@ -74,11 +79,17 @@ public final class TaskExecutor {
   private final List<VertexHarness> sortedHarnesses;
 
   // Metrics information
+
   private long boundedSourceReadTime = 0;
   private long serializedReadBytes = 0;
   private long encodedReadBytes = 0;
   private long timeSinceLastExecution;
+  private long timeSinceLastRecordStreamMetric;
+  private final Map<String, AtomicLong> numOfReadTupleMap;
+  private final Map<String, Long> lastSerializedReadByteMap;
   private final MetricMessageSender metricMessageSender;
+  private long latencyMarkSendPeriod = -1;
+  private final Map<String, Long> latestSentLatencymarkTimestamp;
 
   // Dynamic optimization
   private String idOfVertexPutOnHold;
@@ -102,12 +113,15 @@ public final class TaskExecutor {
                       final IntermediateDataIOFactory intermediateDataIOFactory,
                       final BroadcastManagerWorker broadcastManagerWorker,
                       final MetricMessageSender metricMessageSender,
-                      final PersistentConnectionToMasterMap persistentConnectionToMasterMap) {
+                      final PersistentConnectionToMasterMap persistentConnectionToMasterMap,
+                      final int latencyMarkPeriod) {
     // Essential information
     this.isExecuted = false;
     this.taskId = task.getTaskId();
     this.taskStateManager = taskStateManager;
     this.broadcastManagerWorker = broadcastManagerWorker;
+    this.latencyMarkSendPeriod = latencyMarkPeriod;
+    this.latestSentLatencymarkTimestamp = new HashMap<>();
 
     // Metric sender
     this.metricMessageSender = metricMessageSender;
@@ -123,9 +137,61 @@ public final class TaskExecutor {
     this.dataFetchers = pair.left();
     this.sortedHarnesses = pair.right();
 
+    // initialize metrics
+    this.numOfReadTupleMap = new HashMap<>();
+    this.lastSerializedReadByteMap = new HashMap<>();
+    for (DataFetcher dataFetcher : dataFetchers) {
+      this.numOfReadTupleMap.put(dataFetcher.getDataSource().getId(), new AtomicLong());
+      this.lastSerializedReadByteMap.put(dataFetcher.getDataSource().getId(), 0L);
+    }
+    // set the interval for recording stream metric
+    this.timeSinceLastRecordStreamMetric = System.currentTimeMillis();
     this.timeSinceLastExecution = System.currentTimeMillis();
   }
 
+  /**
+   * Send stream metric to the runtime master.
+   * This method should be called only on a different thread with taskExecutor.
+   * Because this method can greatly affect to the performance.
+   */
+  public void sendStreamMetric() {
+    long currentTimestamp = System.currentTimeMillis();
+
+    Map<String, StreamMetric> streamMetricMap = new HashMap<>();
+    for (DataFetcher dataFetcher : dataFetchers) {
+      String sourceVertexId = dataFetcher.getDataSource().getId();
+
+      Pair<Boolean, Long> serReadBytes = Pair.of(false, -1L);
+
+      if (dataFetcher instanceof SourceVertexDataFetcher) {
+        serReadBytes = Pair.of(true, 0L);
+      } else if (dataFetcher instanceof ParentTaskDataFetcher) {
+        serReadBytes = ((ParentTaskDataFetcher) dataFetcher).getCurrSerBytes();
+      } else if (dataFetcher instanceof MultiThreadParentTaskDataFetcher) {
+        serReadBytes = ((MultiThreadParentTaskDataFetcher) dataFetcher).getCurrSerBytes();
+      }
+
+      // if serializedReadBytes is -1, it means that serializedReadBytes is invalid
+      if (serReadBytes.right() != -1) {
+        long lastSerializedReadBytes = lastSerializedReadByteMap.get(sourceVertexId);
+        lastSerializedReadByteMap.put(sourceVertexId, serReadBytes.right());
+        serReadBytes = Pair.of(serReadBytes.left(), serReadBytes.right() - lastSerializedReadBytes);
+      }
+
+      long numOfTuples = this.numOfReadTupleMap.get(sourceVertexId).get();
+
+      StreamMetric streamMetric = new StreamMetric(this.timeSinceLastRecordStreamMetric, currentTimestamp, numOfTuples,
+        serReadBytes.right(), serReadBytes.left());
+      streamMetricMap.put(sourceVertexId, streamMetric);
+      numOfReadTupleMap.get(sourceVertexId).addAndGet(-numOfTuples);
+    }
+
+    metricMessageSender.send(TASK_METRIC_ID, taskId, "streamMetric",
+      SerializationUtils.serialize((Serializable) streamMetricMap));
+
+    this.timeSinceLastRecordStreamMetric = currentTimestamp;
+  }
+
   // Get all of the intra-task edges + inter-task edges
   private List<Edge> getAllIncomingEdges(
     final Task task,
@@ -236,7 +302,7 @@ public final class TaskExecutor {
         outputCollector = new RunTimeMessageOutputCollector<Map<Object, Long>>(
           taskId, irVertex, persistentConnectionToMasterMap, this, true);
       } else if (irVertex instanceof OperatorVertex
-      && ((OperatorVertex) irVertex).getTransform() instanceof SignalTransform) {
+        && ((OperatorVertex) irVertex).getTransform() instanceof SignalTransform) {
         outputCollector = new RunTimeMessageOutputCollector<Map<String, Long>>(
           taskId, irVertex, persistentConnectionToMasterMap, this, false);
       } else {
@@ -260,7 +326,9 @@ public final class TaskExecutor {
         dataFetcherList.add(new SourceVertexDataFetcher(
           (SourceVertex) irVertex,
           sourceReader.get(),
-          outputCollector));
+          outputCollector,
+          latencyMarkSendPeriod,
+          taskId));
       }
 
       // Parent-task read
@@ -317,6 +385,11 @@ public final class TaskExecutor {
     outputCollector.emitWatermark(watermark);
   }
 
+  private void processLatencymark(final OutputCollector outputCollector,
+                                final Latencymark latencymark) {
+    outputCollector.emitLatencymark(latencymark);
+  }
+
   /**
    * Execute a task, while handling unrecoverable errors and exceptions.
    */
@@ -351,21 +424,16 @@ public final class TaskExecutor {
       return;
     }
 
-    metricMessageSender.send(TASK_METRIC_ID, taskId, "boundedSourceReadTime",
-      SerializationUtils.serialize(boundedSourceReadTime));
-    metricMessageSender.send(TASK_METRIC_ID, taskId, "serializedReadBytes",
-      SerializationUtils.serialize(serializedReadBytes));
-    metricMessageSender.send(TASK_METRIC_ID, taskId, "encodedReadBytes",
-      SerializationUtils.serialize(encodedReadBytes));
+    sendMetrics();
 
     // Phase 2: Finalize task-internal states and elements
     for (final VertexHarness vertexHarness : sortedHarnesses) {
       finalizeVertex(vertexHarness);
     }
 
-    metricMessageSender.send(TASK_METRIC_ID, taskId, "taskDuration",
-      SerializationUtils.serialize(System.currentTimeMillis() - executionStartTime));
     this.timeSinceLastExecution = System.currentTimeMillis();
+    metricMessageSender.send(TASK_METRIC_ID, taskId, "taskDuration",
+      SerializationUtils.serialize(timeSinceLastExecution - executionStartTime));
     if (idOfVertexPutOnHold == null) {
       taskStateManager.onTaskStateChanged(TaskState.State.COMPLETE, Optional.empty(), Optional.empty());
       LOG.info("{} completed", taskId);
@@ -377,6 +445,22 @@ public final class TaskExecutor {
     }
   }
 
+  /**
+   * Send data-processing metrics.
+   */
+  public void sendMetrics() {
+    metricMessageSender.send(TASK_METRIC_ID, taskId, "boundedSourceReadTime",
+      SerializationUtils.serialize(boundedSourceReadTime));
+    metricMessageSender.send(TASK_METRIC_ID, taskId, "serializedReadBytes",
+      SerializationUtils.serialize(serializedReadBytes));
+    metricMessageSender.send(TASK_METRIC_ID, taskId, "encodedReadBytes",
+      SerializationUtils.serialize(encodedReadBytes));
+  }
+
+  /**
+   * Finalize the vertex.
+   * @param vertexHarness the vertex harness.
+   */
   private void finalizeVertex(final VertexHarness vertexHarness) {
     closeTransform(vertexHarness);
     finalizeOutputWriters(vertexHarness);
@@ -402,12 +486,34 @@ public final class TaskExecutor {
         serializedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getSerializedBytes();
         encodedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getEncodedBytes();
       }
+    } else if (event instanceof Latencymark) {
+      Latencymark latencymark = (Latencymark) event;
+      long currTimestamp = System.currentTimeMillis();
+
+      // send latencyMetric to RuntimeMaster
+      LatencyMetric metric = new LatencyMetric(latencymark, currTimestamp);
+      metricMessageSender.send(TASK_METRIC_ID, taskId, "latencymark", SerializationUtils.serialize(metric));
+
+      long latestSentTimestamp = latestSentLatencymarkTimestamp.getOrDefault(latencymark.getCreatedTaskId(), -1L);
+      if (latestSentTimestamp < latencymark.getCreatedTimestamp()) {
+        latestSentLatencymarkTimestamp.put(latencymark.getCreatedTaskId(), latencymark.getCreatedTimestamp());
+
+        // set previousTaskId and timestamp of latencymark for next task.
+        latencymark.setPreviousTaskId(taskId);
+        latencymark.setPreviousSentTimestamp(currTimestamp);
+
+        // process latencymark for downstream tasks
+        processLatencymark(dataFetcher.getOutputCollector(), latencymark);
+      }
     } else if (event instanceof Watermark) {
       // Watermark
       processWatermark(dataFetcher.getOutputCollector(), (Watermark) event);
     } else {
       // Process data element
       processElement(dataFetcher.getOutputCollector(), event);
+
+      // increase the number of read tuples
+      numOfReadTupleMap.get(dataFetcher.getDataSource().getId()).incrementAndGet();
     }
   }
 
@@ -418,9 +524,7 @@ public final class TaskExecutor {
    * @param currentTime   current time
    * @param prevTime      prev time
    */
-  private boolean isPollingTime(final long pollingPeriod,
-                                final long currentTime,
-                                final long prevTime) {
+  private boolean isPollingTime(final long pollingPeriod, final long currentTime, final long prevTime) {
     return (currentTime - prevTime) >= pollingPeriod;
   }
 
@@ -480,7 +584,6 @@ public final class TaskExecutor {
       final Iterator<DataFetcher> pendingIterator = pendingFetchers.iterator();
       final long currentTime = System.currentTimeMillis();
 
-
       if (isPollingTime(pollingInterval, currentTime, prevPollingTime)) {
         // We check pending data every polling interval
         prevPollingTime = currentTime;
diff --git a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
index 3e33fec..abadc00 100644
--- a/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
+++ b/runtime/executor/src/test/java/org/apache/nemo/runtime/executor/task/TaskExecutorTest.java
@@ -37,6 +37,7 @@ import org.apache.nemo.common.ir.vertex.OperatorVertex;
 import org.apache.nemo.common.ir.vertex.SourceVertex;
 import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
 import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.common.punctuation.Latencymark;
 import org.apache.nemo.common.punctuation.Watermark;
 import org.apache.nemo.runtime.common.RuntimeIdManager;
 import org.apache.nemo.runtime.common.message.PersistentConnectionToMasterMap;
@@ -225,8 +226,9 @@ public final class TaskExecutorTest {
     final Map<String, Readable> vertexIdToReadable = new HashMap<>();
     vertexIdToReadable.put(sourceIRVertex.getId(), readable);
     final List<Watermark> emittedWatermarks = new LinkedList<>();
+    final List<Latencymark> emittedLatencymarks = new LinkedList<>();
 
-    final Transform transform = new StreamTransformNoWatermarkEmit(emittedWatermarks);
+    final Transform transform = new StreamTransformNoWatermarkEmit(emittedWatermarks, emittedLatencymarks);
     final OperatorVertex operatorVertex = new OperatorVertex(transform);
 
     final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag =
@@ -317,8 +319,9 @@ public final class TaskExecutorTest {
   @Test()
   public void testMultipleIncomingEdges() throws Exception {
     final List<Watermark> emittedWatermarks = new ArrayList<>();
+    final List<Latencymark> emittedLatencymarks = new ArrayList<>();
     final IRVertex operatorIRVertex1 = new OperatorVertex(new StreamTransform());
-    final IRVertex operatorIRVertex2 = new OperatorVertex(new StreamTransformNoWatermarkEmit(emittedWatermarks));
+    final IRVertex operatorIRVertex2 = new OperatorVertex(new StreamTransformNoWatermarkEmit(emittedWatermarks, emittedLatencymarks));
     final IRVertex operatorIRVertex3 = new OperatorVertex(new StreamTransform());
 
     final IRVertex sourceIRVertex1 = new TestUnboundedSourceVertex();
@@ -627,9 +630,11 @@ public final class TaskExecutorTest {
   private class StreamTransformNoWatermarkEmit<T> implements Transform<T, T> {
     private OutputCollector<T> outputCollector;
     private final List<Watermark> emittedWatermarks;
+    private final List<Latencymark> emittedLatencymarks;
 
-    StreamTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks) {
+    StreamTransformNoWatermarkEmit(final List<Watermark> emittedWatermarks, final List<Latencymark> emittedLatencymarks) {
       this.emittedWatermarks = emittedWatermarks;
+      this.emittedLatencymarks = emittedLatencymarks;
     }
 
     @Override
@@ -643,6 +648,11 @@ public final class TaskExecutorTest {
     }
 
     @Override
+    public void onLatencymark(Latencymark latencymark) {
+      emittedLatencymarks.add(latencymark);
+    }
+
+    @Override
     public void onData(final Object element) {
       outputCollector.emit((T) element);
     }
@@ -765,6 +775,11 @@ public final class TaskExecutorTest {
     }
 
     @Override
+    public void onLatencymark(Latencymark latencymark) {
+      outputCollector.emitLatencymark(latencymark);
+    }
+
+    @Override
     public void onData(final Object element) {
       outputCollector.emit((T) element);
     }
@@ -796,6 +811,11 @@ public final class TaskExecutorTest {
     }
 
     @Override
+    public void onLatencymark(Latencymark latencymark) {
+      // do nothing
+    }
+
+    @Override
     public void onData(final Object element) {
       list.add((T) element);
     }
@@ -832,6 +852,11 @@ public final class TaskExecutorTest {
     }
 
     @Override
+    public void onLatencymark(Latencymark latencymark) {
+      outputCollector.emitLatencymark(latencymark);
+    }
+
+    @Override
     public void onData(final Object element) {
       final Object broadcastVariable = context.getBroadcastVariable(broadcastVariableId);
       outputCollector.emit((T) Pair.of(broadcastVariable, element));
@@ -877,6 +902,11 @@ public final class TaskExecutorTest {
     }
 
     @Override
+    public void onLatencymark(Latencymark latencymark) {
+      outputCollector.emitLatencymark(latencymark);
+    }
+
+    @Override
     public void close() {
       // Do nothing.
     }
@@ -897,6 +927,6 @@ public final class TaskExecutorTest {
 
   private TaskExecutor getTaskExecutor(final Task task, final DAG<IRVertex, RuntimeEdge<IRVertex>> taskDag) {
     return new TaskExecutor(task, taskDag, taskStateManager, intermediateDataIOFactory, broadcastManagerWorker,
-      metricMessageSender, persistentConnectionToMasterMap);
+      metricMessageSender, persistentConnectionToMasterMap, -1);
   }
 }