You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ka...@apache.org on 2018/08/17 15:37:53 UTC
[incubator-heron] branch master updated: Metrics added to measure
the rate at which tuples are added to the queue at spout gateway (#2981)
This is an automated email from the ASF dual-hosted git repository.
karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new d669d06 Metrics added to measure the rate at which tuples are added to the queue at spout gateway (#2981)
d669d06 is described below
commit d669d0615198ce36a8cdebadd184ba4c3bcc9e49
Author: Faria Kalim <fa...@gmail.com>
AuthorDate: Fri Aug 17 08:37:51 2018 -0700
Metrics added to measure the rate at which tuples are added to the queue at spout gateway (#2981)
* added metrics to measure tuples added to outgoing queue per min
* added metric to full metrics instance
* added cumulative outgoing queue size
* measuring number of tuples per tupleset
* added comment justifying use of metrics
* removed extra comment
---
.../heron/common/utils/metrics/BoltMetrics.java | 10 +++++++++-
.../common/utils/metrics/ComponentMetrics.java | 2 +-
.../common/utils/metrics/FullBoltMetrics.java | 9 ++++++++-
.../common/utils/metrics/FullSpoutMetrics.java | 22 ++++++++++++++++++++--
.../heron/common/utils/metrics/SpoutMetrics.java | 8 ++++++++
.../heron/instance/AbstractOutputCollector.java | 2 +-
.../heron/instance/OutgoingTupleCollection.java | 8 +++++++-
.../org/apache/heron/metrics/GatewayMetrics.java | 1 -
8 files changed, 54 insertions(+), 8 deletions(-)
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
index 13006fb..bd40180 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/BoltMetrics.java
@@ -44,7 +44,7 @@ public class BoltMetrics implements ComponentMetrics {
private final CountMetric failCount;
private final CountMetric executeCount;
private final ReducedMetric<MeanReducerState, Number, Double> executeLatency;
-
+ private final CountMetric tupleAddedToQueue;
// Time in nano-seconds spending in execute() at every interval
private final CountMetric emitCount;
@@ -53,6 +53,7 @@ public class BoltMetrics implements ComponentMetrics {
private final CountMetric outQueueFullCount;
+
public BoltMetrics() {
ackCount = new CountMetric();
processLatency = new ReducedMetric<>(new MeanReducer());
@@ -62,6 +63,7 @@ public class BoltMetrics implements ComponentMetrics {
executeLatency = new ReducedMetric<>(new MeanReducer());
emitCount = new CountMetric();
outQueueFullCount = new CountMetric();
+ tupleAddedToQueue = new CountMetric();
}
public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -78,6 +80,8 @@ public class BoltMetrics implements ComponentMetrics {
topologyContext.registerMetric("__execute-latency/default", executeLatency, interval);
topologyContext.registerMetric("__emit-count/default", emitCount, interval);
topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval);
+ topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+ tupleAddedToQueue, interval);
}
// For MultiCountMetrics, we need to set the default value for all streams.
@@ -109,6 +113,10 @@ public class BoltMetrics implements ComponentMetrics {
emitCount.incr();
}
+ public void addTupleToQueue(int size) {
+ tupleAddedToQueue.incr();
+ }
+
public void updateOutQueueFullCount() {
outQueueFullCount.incr();
}
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java
index 81728f6..d9e3b23 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/ComponentMetrics.java
@@ -28,7 +28,7 @@ import org.apache.heron.classification.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface ComponentMetrics {
-
void serializeDataTuple(String streamId, long latency);
void emittedTuple(String streamId);
+ void addTupleToQueue(int size);
}
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
index 84700d3..dfda1c5 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
@@ -52,6 +52,7 @@ public class FullBoltMetrics extends BoltMetrics {
// Time in nano-seconds spending in execute() at every interval
private final MultiCountMetric executeTimeNs;
private final MultiCountMetric emitCount;
+ private final CountMetric tupleAddedToQueue;
private final MultiCountMetric totalDeserializationTimeNs;
private final MultiCountMetric totalSerializationTimeNs;
private final MultiReducedMetric<MeanReducerState, Number, Double> averageSerializationTimeNs;
@@ -61,7 +62,6 @@ public class FullBoltMetrics extends BoltMetrics {
// so instance could not produce more tuples
private final CountMetric outQueueFullCount;
-
public FullBoltMetrics() {
ackCount = new MultiCountMetric();
processLatency = new MultiReducedMetric<>(new MeanReducer());
@@ -72,6 +72,7 @@ public class FullBoltMetrics extends BoltMetrics {
executeTimeNs = new MultiCountMetric();
emitCount = new MultiCountMetric();
outQueueFullCount = new CountMetric();
+ tupleAddedToQueue = new CountMetric();
totalDeserializationTimeNs = new MultiCountMetric();
totalSerializationTimeNs = new MultiCountMetric();
@@ -103,6 +104,8 @@ public class FullBoltMetrics extends BoltMetrics {
"__av-tuple-deserialization-time-ns", totalDeserializationTimeNs, interval);
topologyContext.registerMetric(
"__av-tuple-serialization-time-ns", totalSerializationTimeNs, interval);
+ topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+ tupleAddedToQueue, interval);
}
// For MultiCountMetrics, we need to set the default value for all streams.
@@ -179,6 +182,10 @@ public class FullBoltMetrics extends BoltMetrics {
emitCount.scope(streamId).incr();
}
+ public void addTupleToQueue(int size) {
+ tupleAddedToQueue.incr();
+ }
+
public void updateOutQueueFullCount() {
outQueueFullCount.incr();
}
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
index 412b858..866bdf2 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullSpoutMetrics.java
@@ -45,6 +45,7 @@ import org.apache.heron.common.utils.topology.TopologyContextImpl;
public class FullSpoutMetrics extends SpoutMetrics {
private final MultiCountMetric ackCount;
+ private final ReducedMetric<MeanReducerState, Number, Double> tupleSize;
private final MultiReducedMetric<MeanReducerState, Number, Double> completeLatency;
private final MultiReducedMetric<MeanReducerState, Number, Double> failLatency;
private final MultiCountMetric failCount;
@@ -53,7 +54,7 @@ public class FullSpoutMetrics extends SpoutMetrics {
private final ReducedMetric<MeanReducerState, Number, Double> nextTupleLatency;
private final CountMetric nextTupleCount;
private final MultiCountMetric serializationTimeNs;
-
+ private final CountMetric tupleAddedToQueue;
// The # of times back-pressure happens on outStreamQueue so instance could not
// produce more tuples
private final CountMetric outQueueFullCount;
@@ -73,6 +74,8 @@ public class FullSpoutMetrics extends SpoutMetrics {
outQueueFullCount = new CountMetric();
pendingTuplesCount = new ReducedMetric<>(new MeanReducer());
serializationTimeNs = new MultiCountMetric();
+ tupleAddedToQueue = new CountMetric();
+ tupleSize = new ReducedMetric<>(new MeanReducer());
}
public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -91,7 +94,17 @@ public class FullSpoutMetrics extends SpoutMetrics {
topologyContext.registerMetric("__next-tuple-count", nextTupleCount, interval);
topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval);
topologyContext.registerMetric("__pending-acked-count", pendingTuplesCount, interval);
- topologyContext.registerMetric("__tuple-serialization-time-ns", serializationTimeNs, interval);
+ topologyContext.registerMetric("__tuple-serialization-time-ns", serializationTimeNs,
+ interval);
+
+ // The following metrics measure the rate at which tuples are added to the outgoing
+ // queues at spouts and the sizes of these queues. This allows us to measure whether
+ // the gateway thread pulls out tuples from the queue fast enough, thereby preventing
+ // the spout from becoming a bottleneck.
+ topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+ tupleAddedToQueue, interval);
+ topologyContext.registerMetric("__average-tuple-size-added-queue/default",
+ tupleSize, interval);
}
// For MultiCountMetrics, we need to set the default value for all streams.
@@ -135,6 +148,11 @@ public class FullSpoutMetrics extends SpoutMetrics {
nextTupleCount.incr();
}
+ public void addTupleToQueue(int size) {
+ tupleAddedToQueue.incr();
+ tupleSize.update(size);
+ }
+
public void updateOutQueueFullCount() {
outQueueFullCount.incr();
}
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
index 7ff67d0..cbda4c9 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/SpoutMetrics.java
@@ -47,6 +47,7 @@ public class SpoutMetrics implements ComponentMetrics {
private final CountMetric emitCount;
private final ReducedMetric<MeanReducerState, Number, Double> nextTupleLatency;
private final CountMetric nextTupleCount;
+ private final CountMetric tupleAddedToQueue;
// The # of times back-pressure happens on outStreamQueue so instance could not
// produce more tuples
@@ -66,6 +67,7 @@ public class SpoutMetrics implements ComponentMetrics {
nextTupleCount = new CountMetric();
outQueueFullCount = new CountMetric();
pendingTuplesCount = new ReducedMetric<>(new MeanReducer());
+ tupleAddedToQueue = new CountMetric();
}
public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -84,6 +86,8 @@ public class SpoutMetrics implements ComponentMetrics {
topologyContext.registerMetric("__next-tuple-count", nextTupleCount, interval);
topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval);
topologyContext.registerMetric("__pending-acked-count", pendingTuplesCount, interval);
+ topologyContext.registerMetric("__data-tuple-added-to-outgoing-queue/default",
+ tupleAddedToQueue, interval);
}
// For MultiCountMetrics, we need to set the default value for all streams.
@@ -114,6 +118,10 @@ public class SpoutMetrics implements ComponentMetrics {
emitCount.incr();
}
+ public void addTupleToQueue(int size) {
+ tupleAddedToQueue.incr();
+ }
+
public void nextTuple(long latency) {
nextTupleLatency.update(latency);
nextTupleCount.incr();
diff --git a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
index 08afa61..3e22ac8 100644
--- a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
+++ b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
@@ -81,7 +81,7 @@ public class AbstractOutputCollector {
}
}
- this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock);
+ this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock, metrics);
}
public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
diff --git a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
index 862c300..a3c1e84 100644
--- a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
+++ b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
@@ -35,6 +35,7 @@ import org.apache.heron.common.basics.Communicator;
import org.apache.heron.common.basics.FileUtils;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
+import org.apache.heron.common.utils.metrics.ComponentMetrics;
import org.apache.heron.common.utils.misc.PhysicalPlanHelper;
import org.apache.heron.common.utils.misc.SerializeDeSerializeHelper;
import org.apache.heron.proto.ckptmgr.CheckpointManager;
@@ -71,12 +72,16 @@ public class OutgoingTupleCollection {
private final ReentrantLock lock;
+ protected final ComponentMetrics metrics;
+
public OutgoingTupleCollection(
PhysicalPlanHelper helper,
Communicator<Message> outQueue,
- ReentrantLock lock) {
+ ReentrantLock lock,
+ ComponentMetrics metrics) {
this.outQueue = outQueue;
this.helper = helper;
+ this.metrics = metrics;
SystemConfig systemConfig =
(SystemConfig) SingletonRegistry.INSTANCE.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG);
@@ -238,6 +243,7 @@ public class OutgoingTupleCollection {
bldr.setData(currentDataTuple);
pushTupleToQueue(bldr, outQueue);
+ metrics.addTupleToQueue(currentDataTuple.getTuplesCount());
currentDataTuple = null;
}
diff --git a/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java b/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java
index 64b11bb..8bb40c3 100644
--- a/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java
+++ b/heron/instance/src/java/org/apache/heron/metrics/GatewayMetrics.java
@@ -46,7 +46,6 @@ public class GatewayMetrics {
private final CountMetric sentMetricsSize;
private final CountMetric sentMetricsCount;
private final CountMetric sentExceptionsCount;
-
// The # of items in inStreamQueue
private final ReducedMetric<MeanReducerState, Number, Double> inStreamQueueSize;
// The # of items in outStreamQueue