You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/08/09 20:29:42 UTC
[incubator-heron] branch master updated: Adding average
Serialization/Deserialization time per tuple (#2976)
This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 2b5ab99 Adding average Serialization/Deserialization time per tuple (#2976)
2b5ab99 is described below
commit 2b5ab994fe43a33ca42d492df8d5fb5b72ebf523
Author: Faria Kalim <fa...@gmail.com>
AuthorDate: Thu Aug 9 13:29:39 2018 -0700
Adding average Serialization/Deserialization time per tuple (#2976)
* adding metric to measure average serialization and deserialization times
* adding a metric that measures average serialization and deserialization time per tuple
---
.../common/utils/metrics/FullBoltMetrics.java | 31 +++++++++++++++-------
.../apache/heron/instance/bolt/BoltInstance.java | 15 ++++++-----
2 files changed, 31 insertions(+), 15 deletions(-)
diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
index 320bef0..84700d3 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
@@ -52,8 +52,10 @@ public class FullBoltMetrics extends BoltMetrics {
// Time in nano-seconds spending in execute() at every interval
private final MultiCountMetric executeTimeNs;
private final MultiCountMetric emitCount;
- private final MultiCountMetric deserializationTimeNs;
- private final MultiCountMetric serializationTimeNs;
+ private final MultiCountMetric totalDeserializationTimeNs;
+ private final MultiCountMetric totalSerializationTimeNs;
+ private final MultiReducedMetric<MeanReducerState, Number, Double> averageSerializationTimeNs;
+ private final MultiReducedMetric<MeanReducerState, Number, Double> averageDeserializationTimeNs;
// The # of times back-pressure happens on outStreamQueue
// so instance could not produce more tuples
@@ -71,8 +73,11 @@ public class FullBoltMetrics extends BoltMetrics {
emitCount = new MultiCountMetric();
outQueueFullCount = new CountMetric();
- deserializationTimeNs = new MultiCountMetric();
- serializationTimeNs = new MultiCountMetric();
+ totalDeserializationTimeNs = new MultiCountMetric();
+ totalSerializationTimeNs = new MultiCountMetric();
+
+ averageSerializationTimeNs = new MultiReducedMetric<>(new MeanReducer());
+ averageDeserializationTimeNs = new MultiReducedMetric<>(new MeanReducer());
}
public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -91,8 +96,13 @@ public class FullBoltMetrics extends BoltMetrics {
topologyContext.registerMetric("__emit-count", emitCount, interval);
topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval);
topologyContext.registerMetric(
- "__tuple-deserialization-time-ns", deserializationTimeNs, interval);
- topologyContext.registerMetric("__tuple-serialization-time-ns", serializationTimeNs, interval);
+ "__tuple-deserialization-time-ns", totalDeserializationTimeNs, interval);
+ topologyContext.registerMetric(
+ "__tuple-serialization-time-ns", totalSerializationTimeNs, interval);
+ topologyContext.registerMetric(
+ "__av-tuple-deserialization-time-ns", totalDeserializationTimeNs, interval);
+ topologyContext.registerMetric(
+ "__av-tuple-serialization-time-ns", totalSerializationTimeNs, interval);
}
// For MultiCountMetrics, we need to set the default value for all streams.
@@ -174,17 +184,20 @@ public class FullBoltMetrics extends BoltMetrics {
}
public void deserializeDataTuple(String streamId, String sourceComponent, long latency) {
- deserializationTimeNs.scope(streamId).incrBy(latency);
+ totalDeserializationTimeNs.scope(streamId).incrBy(latency);
+ averageDeserializationTimeNs.scope(streamId).update(latency);
// Consider there are cases that different streams with the same streamId,
// but with different source component. We need to distinguish them too.
String globalStreamId =
new StringBuilder(sourceComponent).append("/").append(streamId).toString();
- deserializationTimeNs.scope(globalStreamId).incrBy(latency);
+ totalDeserializationTimeNs.scope(globalStreamId).incrBy(latency);
+ averageDeserializationTimeNs.scope(globalStreamId).update(latency);
}
public void serializeDataTuple(String streamId, long latency) {
- serializationTimeNs.scope(streamId).incrBy(latency);
+ totalSerializationTimeNs.scope(streamId).incrBy(latency);
+ averageSerializationTimeNs.scope(streamId).update(latency);
}
}
diff --git a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
index f58e181..68193d2 100644
--- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
@@ -291,7 +291,7 @@ public class BoltInstance implements IInstance {
int sourceTaskId = tuples.getSrcTaskId();
for (HeronTuples.HeronDataTuple dataTuple : tuples.getData().getTuplesList()) {
- long startExecuteTuple = System.nanoTime();
+ long startTime = System.nanoTime();
// Create the value list and fill the value
List<Object> values = new ArrayList<>(nValues);
for (int i = 0; i < nValues; i++) {
@@ -300,20 +300,23 @@ public class BoltInstance implements IInstance {
// Decode the tuple
TupleImpl t = new TupleImpl(topologyContext, stream, dataTuple.getKey(),
- dataTuple.getRootsList(), values, startExecuteTuple, false, sourceTaskId);
+ dataTuple.getRootsList(), values, System.nanoTime(), false, sourceTaskId);
+
+ long deserializedTime = System.nanoTime();
// Delegate to the use defined bolt
bolt.execute(t);
- // record the end of a tuple execution
- long endExecuteTuple = System.nanoTime();
-
- long executeLatency = endExecuteTuple - startExecuteTuple;
+ // record the latency of execution
+ long executeLatency = Duration.ofNanos(System.nanoTime()).
+ minusNanos(deserializedTime).toNanos();
// Invoke user-defined execute task hook
topologyContext.invokeHookBoltExecute(t, Duration.ofNanos(executeLatency));
// Update metrics
+ boltMetrics.deserializeDataTuple(stream.getId(), stream.getComponentName(),
+ deserializedTime - startTime);
boltMetrics.executeTuple(stream.getId(), stream.getComponentName(), executeLatency);
}