You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by GitBox <gi...@apache.org> on 2018/08/17 15:37:53 UTC

[GitHub] kramasamy closed pull request #2981: Metrics added to measure the rate at which tuples are added to the queue at spout gateway

kramasamy closed pull request #2981: Metrics added to measure the rate at which tuples are added to the queue at spout gateway
URL: https://github.com/apache/incubator-heron/pull/2981
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
index 13006fb76c..bd401801f1 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
@@ -44,7 +44,7 @@
   private final CountMetric failCount;
   private final CountMetric executeCount;
   private final ReducedMetric<MeanReducerState, Number, Double> executeLatency;
-
+  private final CountMetric tupleAddedToQueue;
   // Time in nano-seconds spending in execute() at every interval
   private final CountMetric emitCount;
 
@@ -53,6 +53,7 @@
   private final CountMetric outQueueFullCount;
 
 
+
   public BoltMetrics() {
     ackCount = new CountMetric();
     processLatency = new ReducedMetric<>(new MeanReducer());
@@ -62,6 +63,7 @@ public BoltMetrics() {
     executeLatency = new ReducedMetric<>(new MeanReducer());
     emitCount = new CountMetric();
     outQueueFullCount = new CountMetric();
+    tupleAddedToQueue = new CountMetric();
   }
 
   public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -78,6 +80,8 @@ public void registerMetrics(TopologyContextImpl topologyContext) {
     topologyContext.registerMetric("__execute-latency/default", executeLatency, interval);
     topologyContext.registerMetric("__emit-count/default", emitCount, interval);
     topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval);
+    topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+        tupleAddedToQueue, interval);
   }
 
   // For MultiCountMetrics, we need to set the default value for all streams.
@@ -109,6 +113,10 @@ public void emittedTuple(String streamId) {
     emitCount.incr();
   }
 
+  public void addTupleToQueue(int size) {
+    tupleAddedToQueue.incr();
+  }
+
   public void updateOutQueueFullCount() {
     outQueueFullCount.incr();
   }
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java
index 81728f6b17..d9e3b232ed 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java
@@ -28,7 +28,7 @@
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface ComponentMetrics {
-
   void serializeDataTuple(String streamId, long latency);
   void emittedTuple(String streamId);
+  void addTupleToQueue(int size);
 }
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
index 84700d371c..dfda1c5bb6 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
@@ -52,6 +52,7 @@
   // Time in nano-seconds spending in execute() at every interval
   private final MultiCountMetric executeTimeNs;
   private final MultiCountMetric emitCount;
+  private final CountMetric tupleAddedToQueue;
   private final MultiCountMetric totalDeserializationTimeNs;
   private final MultiCountMetric totalSerializationTimeNs;
   private final MultiReducedMetric<MeanReducerState, Number, Double> averageSerializationTimeNs;
@@ -61,7 +62,6 @@
   // so instance could not produce more tuples
   private final CountMetric outQueueFullCount;
 
-
   public FullBoltMetrics() {
     ackCount = new MultiCountMetric();
     processLatency = new MultiReducedMetric<>(new MeanReducer());
@@ -72,6 +72,7 @@ public FullBoltMetrics() {
     executeTimeNs = new MultiCountMetric();
     emitCount = new MultiCountMetric();
     outQueueFullCount = new CountMetric();
+    tupleAddedToQueue = new CountMetric();
 
     totalDeserializationTimeNs = new MultiCountMetric();
     totalSerializationTimeNs = new MultiCountMetric();
@@ -103,6 +104,8 @@ public void registerMetrics(TopologyContextImpl topologyContext) {
         "__av-tuple-deserialization-time-ns", totalDeserializationTimeNs, interval);
     topologyContext.registerMetric(
         "__av-tuple-serialization-time-ns", totalSerializationTimeNs, interval);
+    topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+        tupleAddedToQueue, interval);
   }
 
   // For MultiCountMetrics, we need to set the default value for all streams.
@@ -179,6 +182,10 @@ public void emittedTuple(String streamId) {
     emitCount.scope(streamId).incr();
   }
 
+  public void addTupleToQueue(int size) {
+    tupleAddedToQueue.incr();
+  }
+
   public void updateOutQueueFullCount() {
     outQueueFullCount.incr();
   }
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
index 412b858f85..866bdf2f13 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
@@ -45,6 +45,7 @@
 
 public class FullSpoutMetrics extends SpoutMetrics {
   private final MultiCountMetric ackCount;
+  private final ReducedMetric<MeanReducerState, Number, Double> tupleSize;
   private final MultiReducedMetric<MeanReducerState, Number, Double> completeLatency;
   private final MultiReducedMetric<MeanReducerState, Number, Double> failLatency;
   private final MultiCountMetric failCount;
@@ -53,7 +54,7 @@
   private final ReducedMetric<MeanReducerState, Number, Double> nextTupleLatency;
   private final CountMetric nextTupleCount;
   private final MultiCountMetric serializationTimeNs;
-
+  private final CountMetric tupleAddedToQueue;
   // The # of times back-pressure happens on outStreamQueue so instance could not
   // produce more tuples
   private final CountMetric outQueueFullCount;
@@ -73,6 +74,8 @@ public FullSpoutMetrics() {
     outQueueFullCount = new CountMetric();
     pendingTuplesCount = new ReducedMetric<>(new MeanReducer());
     serializationTimeNs = new MultiCountMetric();
+    tupleAddedToQueue = new CountMetric();
+    tupleSize = new ReducedMetric<>(new MeanReducer());
   }
 
   public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -91,7 +94,17 @@ public void registerMetrics(TopologyContextImpl topologyContext) {
     topologyContext.registerMetric("__next-tuple-count", nextTupleCount, interval);
     topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval);
     topologyContext.registerMetric("__pending-acked-count", pendingTuplesCount, interval);
-    topologyContext.registerMetric("__tuple-serialization-time-ns", serializationTimeNs, interval);
+    topologyContext.registerMetric("__tuple-serialization-time-ns", serializationTimeNs,
+        interval);
+
+    // The following metrics measure the rate at which tuples are added to the outgoing
+    // queues at spouts and the sizes of these queues. This allows us to measure whether
+    // the gateway thread pulls out tuples from the queue fast enough, thereby preventing
+    // the spout from becoming a bottleneck.
+    topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+        tupleAddedToQueue, interval);
+    topologyContext.registerMetric("__average-tuple-size-added-queue/default",
+        tupleSize, interval);
   }
 
   // For MultiCountMetrics, we need to set the default value for all streams.
@@ -135,6 +148,11 @@ public void nextTuple(long latency) {
     nextTupleCount.incr();
   }
 
+  public void addTupleToQueue(int size) {
+    tupleAddedToQueue.incr();
+    tupleSize.update(size);
+  }
+
   public void updateOutQueueFullCount() {
     outQueueFullCount.incr();
   }
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
index 7ff67d05c1..cbda4c9f78 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
@@ -47,6 +47,7 @@
   private final CountMetric emitCount;
   private final ReducedMetric<MeanReducerState, Number, Double> nextTupleLatency;
   private final CountMetric nextTupleCount;
+  private final CountMetric tupleAddedToQueue;
 
   // The # of times back-pressure happens on outStreamQueue so instance could not
   // produce more tuples
@@ -66,6 +67,7 @@ public SpoutMetrics() {
     nextTupleCount = new CountMetric();
     outQueueFullCount = new CountMetric();
     pendingTuplesCount = new ReducedMetric<>(new MeanReducer());
+    tupleAddedToQueue = new CountMetric();
   }
 
   public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -84,6 +86,8 @@ public void registerMetrics(TopologyContextImpl topologyContext) {
     topologyContext.registerMetric("__next-tuple-count", nextTupleCount, interval);
     topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval);
     topologyContext.registerMetric("__pending-acked-count", pendingTuplesCount, interval);
+    topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+        tupleAddedToQueue, interval);
   }
 
   // For MultiCountMetrics, we need to set the default value for all streams.
@@ -114,6 +118,10 @@ public void emittedTuple(String streamId) {
     emitCount.incr();
   }
 
+  public void addTupleToQueue(int size) {
+    tupleAddedToQueue.incr();
+  }
+
   public void nextTuple(long latency) {
     nextTupleLatency.update(latency);
     nextTupleCount.incr();
diff --git a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
index 08afa61aec..3e22ac804c 100644
--- a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
+++ b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
@@ -81,7 +81,7 @@ public AbstractOutputCollector(IPluggableSerializer serializer,
       }
     }
 
-    this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock);
+    this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock, metrics);
   }
 
   public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
diff --git a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
index 862c3007c8..a3c1e8477d 100644
--- a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
+++ b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
@@ -35,6 +35,7 @@
 import org.apache.heron.common.basics.FileUtils;
 import org.apache.heron.common.basics.SingletonRegistry;
 import org.apache.heron.common.config.SystemConfig;
+import org.apache.heron.common.utils.metrics.ComponentMetrics;
 import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
 import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
 import org.apache.heron.proto.ckptmgr.CheckpointManager;
@@ -71,12 +72,16 @@
 
   private final ReentrantLock lock;
 
+  protected final ComponentMetrics metrics;
+
   public OutgoingTupleCollection(
       PhysicalPlanHelper helper,
       Communicator<Message> outQueue,
-      ReentrantLock lock) {
+      ReentrantLock lock,
+      ComponentMetrics metrics) {
     this.outQueue = outQueue;
     this.helper = helper;
+    this.metrics = metrics;
     SystemConfig systemConfig =
         (SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
 
@@ -238,6 +243,7 @@ private void flushRemaining() {
       bldr.setData(currentDataTuple);
 
       pushTupleToQueue(bldr, outQueue);
+      metrics.addTupleToQueue(currentDataTuple.getTuplesCount());
 
       currentDataTuple = null;
     }
diff --git a/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java b/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java
index 64b11bb0f2..8bb40c3dcd 100644
--- a/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java
+++ b/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java
@@ -46,7 +46,6 @@
   private final CountMetric sentMetricsSize;
   private final CountMetric sentMetricsCount;
   private final CountMetric sentExceptionsCount;
-
   // The # of items in inStreamQueue
   private final ReducedMetric<MeanReducerState, Number, Double> inStreamQueueSize;
   // The # of items in outStreamQueue


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services