You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nw...@apache.org on 2018/12/19 21:07:06 UTC

[incubator-heron] branch master updated: Add CumulativeCountMetric and metrics to monitor instance task and metrics collection (#3127)

This is an automated email from the ASF dual-hosted git repository.

nwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c42171  Add CumulativeCountMetric and metrics to monitor instance task and metrics collection (#3127)
5c42171 is described below

commit 5c42171a29d3d32f114196722a1e027c5309354e
Author: Ning Wang <nw...@twitter.com>
AuthorDate: Wed Dec 19 13:07:02 2018 -0800

    Add CumulativeCountMetric and metrics to monitor instance task and metrics collection (#3127)
---
 .../heron/api/metric/CumulativeCountMetric.java    | 32 +++++++++++
 .../heron/common/utils/metrics/BoltMetrics.java    | 38 ++++++++++++-
 .../common/utils/metrics/FullBoltMetrics.java      | 37 ++++++++++++-
 .../common/utils/metrics/FullSpoutMetrics.java     | 36 +++++++++++-
 .../heron/common/utils/metrics/IBoltMetrics.java   | 62 +++++++++++++++++++++
 .../heron/common/utils/metrics/ISpoutMetrics.java  | 64 ++++++++++++++++++++++
 .../common/utils/metrics/MetricsCollector.java     | 17 +++++-
 .../heron/common/utils/metrics/SpoutMetrics.java   | 36 +++++++++++-
 .../apache/heron/instance/bolt/BoltInstance.java   |  7 ++-
 .../instance/bolt/BoltOutputCollectorImpl.java     |  6 +-
 .../apache/heron/instance/spout/SpoutInstance.java |  8 ++-
 11 files changed, 327 insertions(+), 16 deletions(-)

diff --git a/heron/api/src/java/org/apache/heron/api/metric/CumulativeCountMetric.java b/heron/api/src/java/org/apache/heron/api/metric/CumulativeCountMetric.java
new file mode 100644
index 0000000..b357f2e
--- /dev/null
+++ b/heron/api/src/java/org/apache/heron/api/metric/CumulativeCountMetric.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.api.metric;
+
+/**
+ * This is a different kind of counter that value is not
+ * reset after fetched.
+ */
+public class CumulativeCountMetric extends CountMetric {
+  @Override
+  public Long getValueAndReset() {
+    // Return value without resetting.
+    return getValue();
+  }
+}
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
index bd40180..967974d 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
@@ -20,6 +20,7 @@
 package org.apache.heron.common.utils.metrics;
 
 import org.apache.heron.api.metric.CountMetric;
+import org.apache.heron.api.metric.CumulativeCountMetric;
 import org.apache.heron.api.metric.MeanReducer;
 import org.apache.heron.api.metric.MeanReducerState;
 import org.apache.heron.api.metric.ReducedMetric;
@@ -35,9 +36,12 @@ import org.apache.heron.common.utils.topology.TopologyContextImpl;
  * 2. New them in the constructor
  * 3. Register them in registerMetrics(...) by using MetricsCollector's registerMetric(...)
  * 4. Expose methods which could be called externally to change the value of metrics
+ *
+ * This is a fast bolt metrics object and it is NOT used in heron core. To use this
+ * metrics object, go to BoltInstance.java and replace "new FullBoltMetrcs" to "new BoltMetrics"
  */
 
-public class BoltMetrics implements ComponentMetrics {
+public class BoltMetrics implements IBoltMetrics {
   private final CountMetric ackCount;
   private final ReducedMetric<MeanReducerState, Number, Double> processLatency;
   private final ReducedMetric<MeanReducerState, Number, Double> failLatency;
@@ -51,8 +55,18 @@ public class BoltMetrics implements ComponentMetrics {
   // The # of times back-pressure happens on outStreamQueue
   // so instance could not produce more tuples
   private final CountMetric outQueueFullCount;
-
-
+  /*
+   * Metrics for how many times spout instance task is run.
+   */
+  private CumulativeCountMetric taskRunCount;
+  /*
+   * Metrics for how many times spout produceTuple is called.
+   */
+  private CumulativeCountMetric executionCount;
+  /*
+   * Metrics for how many times spout continue work is true.
+   */
+  private CumulativeCountMetric continueWorkCount;
 
   public BoltMetrics() {
     ackCount = new CountMetric();
@@ -64,6 +78,9 @@ public class BoltMetrics implements ComponentMetrics {
     emitCount = new CountMetric();
     outQueueFullCount = new CountMetric();
     tupleAddedToQueue = new CountMetric();
+    taskRunCount = new CumulativeCountMetric();
+    executionCount = new CumulativeCountMetric();
+    continueWorkCount = new CumulativeCountMetric();
   }
 
   public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -82,6 +99,9 @@ public class BoltMetrics implements ComponentMetrics {
     topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval);
     topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
         tupleAddedToQueue, interval);
+    topologyContext.registerMetric("__task-run-count", taskRunCount, interval);
+    topologyContext.registerMetric("__execution-count", executionCount, interval);
+    topologyContext.registerMetric("__continue-work-count", continueWorkCount, interval);
   }
 
   // For MultiCountMetrics, we need to set the default value for all streams.
@@ -126,4 +146,16 @@ public class BoltMetrics implements ComponentMetrics {
 
   public void serializeDataTuple(String streamId, long latency) {
   }
+
+  public void updateTaskRunCount() {
+    taskRunCount.incr();
+  }
+
+  public void updateExecutionCount() {
+    executionCount.incr();
+  }
+
+  public void updateContinueWorkCount() {
+    continueWorkCount.incr();
+  }
 }
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
index dfda1c5..bbff2b6 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.heron.api.generated.TopologyAPI;
 import org.apache.heron.api.metric.CountMetric;
+import org.apache.heron.api.metric.CumulativeCountMetric;
 import org.apache.heron.api.metric.MeanReducer;
 import org.apache.heron.api.metric.MeanReducerState;
 import org.apache.heron.api.metric.MultiCountMetric;
@@ -39,9 +40,12 @@ import org.apache.heron.common.utils.topology.TopologyContextImpl;
  * 2. New them in the constructor
  * 3. Register them in registerMetrics(...) by using MetricsCollector's registerMetric(...)
  * 4. Expose methods which could be called externally to change the value of metrics
+ *
+ * This is a bolt metrics object with more information and it is used in heron core. To change to a faster
+ * metrics object, go to BoltInstance.java and replace "new FullBoltMetrcs" to "new BoltMetrics"
  */
 
-public class FullBoltMetrics extends BoltMetrics {
+public class FullBoltMetrics implements IBoltMetrics {
   private final MultiCountMetric ackCount;
   private final MultiReducedMetric<MeanReducerState, Number, Double> processLatency;
   private final MultiReducedMetric<MeanReducerState, Number, Double> failLatency;
@@ -61,6 +65,18 @@ public class FullBoltMetrics extends BoltMetrics {
   // The # of times back-pressure happens on outStreamQueue
   // so instance could not produce more tuples
   private final CountMetric outQueueFullCount;
+  /*
+   * Metrics for how many times spout instance task is run.
+   */
+  private CumulativeCountMetric taskRunCount;
+  /*
+   * Metrics for how many times spout produceTuple is called.
+   */
+  private CumulativeCountMetric executionCount;
+  /*
+   * Metrics for how many times spout continue work is true.
+   */
+  private CumulativeCountMetric continueWorkCount;
 
   public FullBoltMetrics() {
     ackCount = new MultiCountMetric();
@@ -73,6 +89,9 @@ public class FullBoltMetrics extends BoltMetrics {
     emitCount = new MultiCountMetric();
     outQueueFullCount = new CountMetric();
     tupleAddedToQueue = new CountMetric();
+    taskRunCount = new CumulativeCountMetric();
+    executionCount = new CumulativeCountMetric();
+    continueWorkCount = new CumulativeCountMetric();
 
     totalDeserializationTimeNs = new MultiCountMetric();
     totalSerializationTimeNs = new MultiCountMetric();
@@ -96,6 +115,10 @@ public class FullBoltMetrics extends BoltMetrics {
     topologyContext.registerMetric("__execute-time-ns", executeTimeNs, interval);
     topologyContext.registerMetric("__emit-count", emitCount, interval);
     topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval);
+    topologyContext.registerMetric("__task-run-count", taskRunCount, interval);
+    topologyContext.registerMetric("__execution-count", executionCount, interval);
+    topologyContext.registerMetric("__continue-work-count", continueWorkCount, interval);
+
     topologyContext.registerMetric(
         "__tuple-deserialization-time-ns", totalDeserializationTimeNs, interval);
     topologyContext.registerMetric(
@@ -206,5 +229,17 @@ public class FullBoltMetrics extends BoltMetrics {
     totalSerializationTimeNs.scope(streamId).incrBy(latency);
     averageSerializationTimeNs.scope(streamId).update(latency);
   }
+
+  public void updateTaskRunCount() {
+    taskRunCount.incr();
+  }
+
+  public void updateExecutionCount() {
+    executionCount.incr();
+  }
+
+  public void updateContinueWorkCount() {
+    continueWorkCount.incr();
+  }
 }
 
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
index 866bdf2..142bf73 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.heron.api.generated.TopologyAPI;
 import org.apache.heron.api.metric.CountMetric;
+import org.apache.heron.api.metric.CumulativeCountMetric;
 import org.apache.heron.api.metric.MeanReducer;
 import org.apache.heron.api.metric.MeanReducerState;
 import org.apache.heron.api.metric.MultiCountMetric;
@@ -41,9 +42,12 @@ import org.apache.heron.common.utils.topology.TopologyContextImpl;
  * 2. New them in the constructor
  * 3. Register them in registerMetrics(...) by using MetricsCollector's registerMetric(...)
  * 4. Expose methods which could be called externally to change the value of metrics
+ *
+ * This is a spout metrics object with more information and it is used in heron core. To change to a faster
+ * metrics object, go to SpoutInstance.java and replace "new FullSpoutMetrcs" to "new SpoutMetrics"
  */
 
-public class FullSpoutMetrics extends SpoutMetrics {
+public class FullSpoutMetrics implements ISpoutMetrics {
   private final MultiCountMetric ackCount;
   private final ReducedMetric<MeanReducerState, Number, Double> tupleSize;
   private final MultiReducedMetric<MeanReducerState, Number, Double> completeLatency;
@@ -61,6 +65,18 @@ public class FullSpoutMetrics extends SpoutMetrics {
 
   // The mean # of pending-to-be-acked tuples in spout if acking is enabled
   private final ReducedMetric<MeanReducerState, Number, Double> pendingTuplesCount;
+  /*
+   * Metrics for how many times spout instance task is run.
+   */
+  private CumulativeCountMetric taskRunCount;
+  /*
+   * Metrics for how many times spout produceTuple is called.
+   */
+  private CumulativeCountMetric produceTupleCount;
+  /*
+   * Metrics for how many times spout continue work is true.
+   */
+  private CumulativeCountMetric continueWorkCount;
 
   public FullSpoutMetrics() {
     ackCount = new MultiCountMetric();
@@ -76,6 +92,9 @@ public class FullSpoutMetrics extends SpoutMetrics {
     serializationTimeNs = new MultiCountMetric();
     tupleAddedToQueue = new CountMetric();
     tupleSize = new ReducedMetric<>(new MeanReducer());
+    taskRunCount = new CumulativeCountMetric();
+    produceTupleCount = new CumulativeCountMetric();
+    continueWorkCount = new CumulativeCountMetric();
   }
 
   public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -96,6 +115,9 @@ public class FullSpoutMetrics extends SpoutMetrics {
     topologyContext.registerMetric("__pending-acked-count", pendingTuplesCount, interval);
     topologyContext.registerMetric("__tuple-serialization-time-ns", serializationTimeNs,
         interval);
+    topologyContext.registerMetric("__task-run-count", taskRunCount, interval);
+    topologyContext.registerMetric("__produce-tuple-count", produceTupleCount, interval);
+    topologyContext.registerMetric("__continue-work-count", continueWorkCount, interval);
 
     // The following metrics measure the rate at which tuples are added to the outgoing
     // queues at spouts and the sizes of these queues. This allows us to measure whether
@@ -164,5 +186,17 @@ public class FullSpoutMetrics extends SpoutMetrics {
   public void serializeDataTuple(String streamId, long latency) {
     serializationTimeNs.scope(streamId).incrBy(latency);
   }
+
+  public void updateTaskRunCount() {
+    taskRunCount.incr();
+  }
+
+  public void updateProduceTupleCount() {
+    produceTupleCount.incr();
+  }
+
+  public void updateContinueWorkCount() {
+    continueWorkCount.incr();
+  }
 }
 
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/IBoltMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/IBoltMetrics.java
new file mode 100644
index 0000000..a81ae2e
--- /dev/null
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/IBoltMetrics.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.common.utils.metrics;
+
+import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
+import org.apache.heron.common.utils.topology.TopologyContextImpl;
+
+/**
+ * Bolt's metrics to be collect
+ * We need to:
+ * 1. Define the metrics to be collected
+ * 2. New them in the constructor
+ * 3. Register them in registerMetrics(...) by using MetricsCollector's registerMetric(...)
+ * 4. Expose methods which could be called externally to change the value of metrics
+ */
+
+public interface IBoltMetrics extends ComponentMetrics {
+  void registerMetrics(TopologyContextImpl topologyContext);
+
+  // For MultiCountMetrics, we need to set the default value for all streams.
+  // Otherwise, it is possible one metric for a particular stream is null.
+  // For instance, the fail-count on a particular stream could be undefined
+  // causing metrics not be exported.
+  // However, it will not set the Multi Reduced/Assignable Metrics,
+  // since we could not have default values for them
+  void initMultiCountMetrics(PhysicalPlanHelper helper);
+
+  void ackedTuple(String streamId, String sourceComponent, long latency);
+
+  void failedTuple(String streamId, String sourceComponent, long latency);
+
+  void executeTuple(String streamId, String sourceComponent, long latency);
+
+  void updateOutQueueFullCount();
+
+  void deserializeDataTuple(String streamId, String sourceComponent, long latency);
+
+  void serializeDataTuple(String streamId, long latency);
+
+  void updateTaskRunCount();
+
+  void updateExecutionCount();
+
+  void updateContinueWorkCount();
+}
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/ISpoutMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/ISpoutMetrics.java
new file mode 100644
index 0000000..e875b05
--- /dev/null
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/ISpoutMetrics.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.common.utils.metrics;
+
+import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
+import org.apache.heron.common.utils.topology.TopologyContextImpl;
+
+
+/**
+ * Spout's metrics to be collect
+ * We need to:
+ * 1. Define the metrics to be collected
+ * 2. New them in the constructor
+ * 3. Register them in registerMetrics(...) by using MetricsCollector's registerMetric(...)
+ * 4. Expose methods which could be called externally to change the value of metrics
+ */
+
+public interface ISpoutMetrics extends ComponentMetrics {
+
+  void registerMetrics(TopologyContextImpl topologyContext);
+
+  // For MultiCountMetrics, we need to set the default value for all streams.
+  // Otherwise, it is possible one metric for a particular stream is null.
+  // For instance, the fail-count on a particular stream could be undefined
+  // causing metrics not be exported.
+  // However, it will not set the Multi Reduced/Assignable Metrics,
+  // since we could not have default values for them
+  void initMultiCountMetrics(PhysicalPlanHelper helper);
+
+  void ackedTuple(String streamId, long latency);
+
+  void failedTuple(String streamId, long latency);
+
+  void timeoutTuple(String streamId);
+
+  void nextTuple(long latency);
+
+  void updateOutQueueFullCount();
+
+  void updatePendingTuplesCount(long count);
+
+  void updateTaskRunCount();
+
+  void updateProduceTupleCount();
+
+  void updateContinueWorkCount();
+}
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java
index 1c98521..0a2bfc9 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/MetricsCollector.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.logging.Logger;
 
+import org.apache.heron.api.metric.CumulativeCountMetric;
 import org.apache.heron.api.metric.IMetric;
 import org.apache.heron.api.metric.IMetricsRegister;
 import org.apache.heron.common.basics.Communicator;
@@ -41,10 +42,12 @@ import org.apache.heron.proto.system.Metrics;
  */
 public class MetricsCollector implements IMetricsRegister {
   private static final Logger LOG = Logger.getLogger(MetricsCollector.class.getName());
+  private static final String COLLECTION_COUNT_NAME = "__collector-collection-count";
 
   private Map<String, IMetric<?>> metrics;
   private Map<Integer, List<String>> timeBucketToMetricNames;
   private WakeableLooper runnableToGatherMetrics;
+  private CumulativeCountMetric metricCollectionCount;
 
   private Communicator<Metrics.MetricPublisherPublishMessage> queue;
 
@@ -54,6 +57,7 @@ public class MetricsCollector implements IMetricsRegister {
     timeBucketToMetricNames = new HashMap<>();
     this.queue = queue;
     this.runnableToGatherMetrics = runnableToGatherMetrics;
+    metricCollectionCount = new CumulativeCountMetric();
   }
 
   @Override
@@ -108,11 +112,14 @@ public class MetricsCollector implements IMetricsRegister {
       for (String metricName : metricNames) {
         gatherOneMetric(metricName, builder);
       }
+    }
 
-      Metrics.MetricPublisherPublishMessage msg = builder.build();
+    metricCollectionCount.incr();
+    addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME, metricCollectionCount);
 
-      queue.offer(msg);
-    }
+    Metrics.MetricPublisherPublishMessage msg = builder.build();
+
+    queue.offer(msg);
   }
 
   private void addDataToMetricPublisher(Metrics.MetricPublisherPublishMessage.Builder builder,
@@ -145,6 +152,10 @@ public class MetricsCollector implements IMetricsRegister {
         gatherOneMetric(metricName, builder);
       }
 
+      metricCollectionCount.incr();
+      addDataToMetricPublisher(builder, COLLECTION_COUNT_NAME,
+                               metricCollectionCount.getValueAndReset());
+
       Metrics.MetricPublisherPublishMessage msg = builder.build();
 
       queue.offer(msg);
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
index cbda4c9..0d2b862 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
@@ -20,6 +20,7 @@
 package org.apache.heron.common.utils.metrics;
 
 import org.apache.heron.api.metric.CountMetric;
+import org.apache.heron.api.metric.CumulativeCountMetric;
 import org.apache.heron.api.metric.MeanReducer;
 import org.apache.heron.api.metric.MeanReducerState;
 import org.apache.heron.api.metric.ReducedMetric;
@@ -36,9 +37,12 @@ import org.apache.heron.common.utils.topology.TopologyContextImpl;
  * 2. New them in the constructor
  * 3. Register them in registerMetrics(...) by using MetricsCollector's registerMetric(...)
  * 4. Expose methods which could be called externally to change the value of metrics
+ *
+ * This is a fast spout metrics object and it is NOT used in heron core. To use this
+ * metrics object, go to SpoutInstance.java and replace "new FullSpoutMetrcs" to "new SpoutMetrics"
  */
 
-public class SpoutMetrics implements ComponentMetrics {
+public class SpoutMetrics implements ISpoutMetrics {
   private final CountMetric ackCount;
   private final ReducedMetric<MeanReducerState, Number, Double> completeLatency;
   private final ReducedMetric<MeanReducerState, Number, Double> failLatency;
@@ -55,6 +59,18 @@ public class SpoutMetrics implements ComponentMetrics {
 
   // The mean # of pending-to-be-acked tuples in spout if acking is enabled
   private final ReducedMetric<MeanReducerState, Number, Double> pendingTuplesCount;
+  /*
+   * Metrics for how many times spout instance task is run.
+   */
+  private CumulativeCountMetric taskRunCount;
+  /*
+   * Metrics for how many times spout produceTuple is called.
+   */
+  private CumulativeCountMetric produceTupleCount;
+  /*
+   * Metrics for how many times spout continue work is true.
+   */
+  private CumulativeCountMetric continueWorkCount;
 
   public SpoutMetrics() {
     ackCount = new CountMetric();
@@ -68,6 +84,9 @@ public class SpoutMetrics implements ComponentMetrics {
     outQueueFullCount = new CountMetric();
     pendingTuplesCount = new ReducedMetric<>(new MeanReducer());
     tupleAddedToQueue = new CountMetric();
+    taskRunCount = new CumulativeCountMetric();
+    produceTupleCount = new CumulativeCountMetric();
+    continueWorkCount = new CumulativeCountMetric();
   }
 
   public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -88,6 +107,9 @@ public class SpoutMetrics implements ComponentMetrics {
     topologyContext.registerMetric("__pending-acked-count", pendingTuplesCount, interval);
     topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
         tupleAddedToQueue, interval);
+    topologyContext.registerMetric("__task-run-count", taskRunCount, interval);
+    topologyContext.registerMetric("__produce-tuple-count", produceTupleCount, interval);
+    topologyContext.registerMetric("__continue-work-count", continueWorkCount, interval);
   }
 
   // For MultiCountMetrics, we need to set the default value for all streams.
@@ -137,4 +159,16 @@ public class SpoutMetrics implements ComponentMetrics {
 
   public void serializeDataTuple(String streamId, long latency) {
   }
+
+  public void updateTaskRunCount() {
+    taskRunCount.incr();
+  }
+
+  public void updateProduceTupleCount() {
+    produceTupleCount.incr();
+  }
+
+  public void updateContinueWorkCount() {
+    continueWorkCount.incr();
+  }
 }
diff --git a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
index 68193d2..893ed01 100644
--- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
@@ -45,8 +45,8 @@ import org.apache.heron.common.basics.SingletonRegistry;
 import org.apache.heron.common.basics.SlaveLooper;
 import org.apache.heron.common.basics.TypeUtils;
 import org.apache.heron.common.config.SystemConfig;
-import org.apache.heron.common.utils.metrics.BoltMetrics;
 import org.apache.heron.common.utils.metrics.FullBoltMetrics;
+import org.apache.heron.common.utils.metrics.IBoltMetrics;
 import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
 import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
 import org.apache.heron.common.utils.topology.TopologyContextImpl;
@@ -64,7 +64,7 @@ public class BoltInstance implements IInstance {
   protected final IBolt bolt;
   protected final BoltOutputCollectorImpl collector;
   protected final IPluggableSerializer serializer;
-  protected final BoltMetrics boltMetrics;
+  protected final IBoltMetrics boltMetrics;
   // The bolt will read Data tuples from streamInQueue
   private final Communicator<Message> streamInQueue;
 
@@ -237,8 +237,10 @@ public class BoltInstance implements IInstance {
     Runnable boltTasks = new Runnable() {
       @Override
       public void run() {
+        boltMetrics.updateTaskRunCount();
         // Back-pressure -- only when we could send out tuples will we read & execute tuples
         if (collector.isOutQueuesAvailable()) {
+          boltMetrics.updateExecutionCount();
           readTuplesAndExecute(streamInQueue);
 
           // Though we may execute MAX_READ tuples, finally we will packet it as
@@ -251,6 +253,7 @@ public class BoltInstance implements IInstance {
 
         // If there are more to read, we will wake up itself next time when it doWait()
         if (collector.isOutQueuesAvailable() && !streamInQueue.isEmpty()) {
+          boltMetrics.updateContinueWorkCount();
           looper.wakeUp();
         }
       }
diff --git a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltOutputCollectorImpl.java b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltOutputCollectorImpl.java
index 4fd14a3..170bcfa 100644
--- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltOutputCollectorImpl.java
+++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltOutputCollectorImpl.java
@@ -33,7 +33,7 @@ import org.apache.heron.api.bolt.IOutputCollector;
 import org.apache.heron.api.serializer.IPluggableSerializer;
 import org.apache.heron.api.tuple.Tuple;
 import org.apache.heron.common.basics.Communicator;
-import org.apache.heron.common.utils.metrics.BoltMetrics;
+import org.apache.heron.common.utils.metrics.IBoltMetrics;
 import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
 import org.apache.heron.common.utils.tuple.TupleImpl;
 import org.apache.heron.instance.AbstractOutputCollector;
@@ -61,12 +61,12 @@ public class BoltOutputCollectorImpl extends AbstractOutputCollector implements
   private static final Logger LOG = Logger.getLogger(BoltOutputCollectorImpl.class.getName());
 
   // Reference to update the bolt metrics
-  private final BoltMetrics boltMetrics;
+  private final IBoltMetrics boltMetrics;
 
   protected BoltOutputCollectorImpl(IPluggableSerializer serializer,
                                     PhysicalPlanHelper helper,
                                     Communicator<Message> streamOutQueue,
-                                    BoltMetrics boltMetrics) {
+                                    IBoltMetrics boltMetrics) {
     super(serializer, helper, streamOutQueue, boltMetrics);
     this.boltMetrics = boltMetrics;
 
diff --git a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
index 5fc1fdb..348a641 100644
--- a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
@@ -46,7 +46,7 @@ import org.apache.heron.common.basics.SlaveLooper;
 import org.apache.heron.common.basics.TypeUtils;
 import org.apache.heron.common.config.SystemConfig;
 import org.apache.heron.common.utils.metrics.FullSpoutMetrics;
-import org.apache.heron.common.utils.metrics.SpoutMetrics;
+import org.apache.heron.common.utils.metrics.ISpoutMetrics;
 import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
 import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
 import org.apache.heron.common.utils.topology.TopologyContextImpl;
@@ -60,7 +60,7 @@ public class SpoutInstance implements IInstance {
 
   protected final ISpout spout;
   protected final SpoutOutputCollectorImpl collector;
-  protected final SpoutMetrics spoutMetrics;
+  protected final ISpoutMetrics spoutMetrics;
   // The spout will read Control tuples from streamInQueue
   private final Communicator<Message> streamInQueue;
 
@@ -255,8 +255,11 @@ public class SpoutInstance implements IInstance {
     Runnable spoutTasks = new Runnable() {
       @Override
       public void run() {
+        spoutMetrics.updateTaskRunCount();
+
         // Check whether we should produce more tuples
         if (isProduceTuple()) {
+          spoutMetrics.updateProduceTupleCount();
           produceTuple();
           // Though we may execute MAX_READ tuples, finally we will packet it as
           // one outgoingPacket and push to out queues
@@ -281,6 +284,7 @@ public class SpoutInstance implements IInstance {
 
         // If we have more work to do
         if (isContinueWork()) {
+          spoutMetrics.updateContinueWorkCount();
           looper.wakeUp();
         }
       }