You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by nl...@apache.org on 2018/08/09 20:29:42 UTC

[incubator-heron] branch master updated: Adding average Serialization/Deserialization time per tuple (#2976)

This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b5ab99  Adding average Serialization/Deserialization time per tuple (#2976)
2b5ab99 is described below

commit 2b5ab994fe43a33ca42d492df8d5fb5b72ebf523
Author: Faria Kalim <fa...@gmail.com>
AuthorDate: Thu Aug 9 13:29:39 2018 -0700

    Adding average Serialization/Deserialization time per tuple (#2976)
    
    * adding metric to measure average serialization and deserialization times
    
    * adding a metric that measures average serialization and deserialization time per tuple
---
 .../common/utils/metrics/FullBoltMetrics.java      | 31 +++++++++++++++-------
 .../apache/heron/instance/bolt/BoltInstance.java   | 15 ++++++-----
 2 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
index 320bef0..84700d3 100644
--- a/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
+++ b/heron/common/src/java/org/apache/heron/common/utils/metrics/FullBoltMetrics.java
@@ -52,8 +52,10 @@ public class FullBoltMetrics extends BoltMetrics {
   // Time in nano-seconds spending in execute() at every interval
   private final MultiCountMetric executeTimeNs;
   private final MultiCountMetric emitCount;
-  private final MultiCountMetric deserializationTimeNs;
-  private final MultiCountMetric serializationTimeNs;
+  private final MultiCountMetric totalDeserializationTimeNs;
+  private final MultiCountMetric totalSerializationTimeNs;
+  private final MultiReducedMetric<MeanReducerState, Number, Double> averageSerializationTimeNs;
+  private final MultiReducedMetric<MeanReducerState, Number, Double> averageDeserializationTimeNs;
 
   // The # of times back-pressure happens on outStreamQueue
   // so instance could not produce more tuples
@@ -71,8 +73,11 @@ public class FullBoltMetrics extends BoltMetrics {
     emitCount = new MultiCountMetric();
     outQueueFullCount = new CountMetric();
 
-    deserializationTimeNs = new MultiCountMetric();
-    serializationTimeNs = new MultiCountMetric();
+    totalDeserializationTimeNs = new MultiCountMetric();
+    totalSerializationTimeNs = new MultiCountMetric();
+
+    averageSerializationTimeNs = new MultiReducedMetric<>(new MeanReducer());
+    averageDeserializationTimeNs = new MultiReducedMetric<>(new MeanReducer());
   }
 
   public void registerMetrics(TopologyContextImpl topologyContext) {
@@ -91,8 +96,13 @@ public class FullBoltMetrics extends BoltMetrics {
     topologyContext.registerMetric("__emit-count", emitCount, interval);
     topologyContext.registerMetric("__out-queue-full-count", outQueueFullCount, interval);
     topologyContext.registerMetric(
-        "__tuple-deserialization-time-ns", deserializationTimeNs, interval);
-    topologyContext.registerMetric("__tuple-serialization-time-ns", serializationTimeNs, interval);
+        "__tuple-deserialization-time-ns", totalDeserializationTimeNs, interval);
+    topologyContext.registerMetric(
+        "__tuple-serialization-time-ns", totalSerializationTimeNs, interval);
+    topologyContext.registerMetric(
+        "__av-tuple-deserialization-time-ns", totalDeserializationTimeNs, interval);
+    topologyContext.registerMetric(
+        "__av-tuple-serialization-time-ns", totalSerializationTimeNs, interval);
   }
 
   // For MultiCountMetrics, we need to set the default value for all streams.
@@ -174,17 +184,20 @@ public class FullBoltMetrics extends BoltMetrics {
   }
 
   public void deserializeDataTuple(String streamId, String sourceComponent, long latency) {
-    deserializationTimeNs.scope(streamId).incrBy(latency);
+    totalDeserializationTimeNs.scope(streamId).incrBy(latency);
+    averageDeserializationTimeNs.scope(streamId).update(latency);
 
     // Consider there are cases that different streams with the same streamId,
     // but with different source component. We need to distinguish them too.
     String globalStreamId =
         new StringBuilder(sourceComponent).append("/").append(streamId).toString();
-    deserializationTimeNs.scope(globalStreamId).incrBy(latency);
+    totalDeserializationTimeNs.scope(globalStreamId).incrBy(latency);
+    averageDeserializationTimeNs.scope(globalStreamId).update(latency);
   }
 
   public void serializeDataTuple(String streamId, long latency) {
-    serializationTimeNs.scope(streamId).incrBy(latency);
+    totalSerializationTimeNs.scope(streamId).incrBy(latency);
+    averageSerializationTimeNs.scope(streamId).update(latency);
   }
 }
 
diff --git a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
index f58e181..68193d2 100644
--- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
@@ -291,7 +291,7 @@ public class BoltInstance implements IInstance {
         int sourceTaskId = tuples.getSrcTaskId();
 
         for (HeronTuples.HeronDataTuple dataTuple : tuples.getData().getTuplesList()) {
-          long startExecuteTuple = System.nanoTime();
+          long startTime = System.nanoTime();
           // Create the value list and fill the value
           List<Object> values = new ArrayList<>(nValues);
           for (int i = 0; i < nValues; i++) {
@@ -300,20 +300,23 @@ public class BoltInstance implements IInstance {
 
           // Decode the tuple
           TupleImpl t = new TupleImpl(topologyContext, stream, dataTuple.getKey(),
-              dataTuple.getRootsList(), values, startExecuteTuple, false, sourceTaskId);
+              dataTuple.getRootsList(), values, System.nanoTime(), false, sourceTaskId);
+
+          long deserializedTime = System.nanoTime();
 
           // Delegate to the use defined bolt
           bolt.execute(t);
 
-          // record the end of a tuple execution
-          long endExecuteTuple = System.nanoTime();
-
-          long executeLatency = endExecuteTuple - startExecuteTuple;
+          // record the latency of execution
+          long executeLatency = Duration.ofNanos(System.nanoTime()).
+              minusNanos(deserializedTime).toNanos();
 
           // Invoke user-defined execute task hook
           topologyContext.invokeHookBoltExecute(t, Duration.ofNanos(executeLatency));
 
           // Update metrics
+          boltMetrics.deserializeDataTuple(stream.getId(), stream.getComponentName(),
+              deserializedTime - startTime);
           boltMetrics.executeTuple(stream.getId(), stream.getComponentName(), executeLatency);
         }