You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2021/11/18 20:35:30 UTC
[pinot] branch master updated: Split thread cpu time into three metrics (#7724)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9662399 Split thread cpu time into three metrics (#7724)
9662399 is described below
commit 9662399a60689ea305acd51fad0f775824aa18f3
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Thu Nov 18 12:35:12 2021 -0800
Split thread cpu time into three metrics (#7724)
* Split thread CPU time into three metrics
* fix typo
---
.../apache/pinot/broker/api/RequestStatistics.java | 61 +++++++++
.../requesthandler/BaseBrokerRequestHandler.java | 22 +++-
.../apache/pinot/common/metrics/BrokerTimer.java | 22 +++-
.../apache/pinot/common/metrics/ServerTimer.java | 14 +-
.../pinot/common/response/BrokerResponse.java | 94 +++++++++++---
.../response/broker/BrokerResponseNative.java | 143 ++++++++++++++++-----
.../org/apache/pinot/common/utils/DataTable.java | 4 +-
.../src/main/resources/app/interfaces/types.d.ts | 6 +
.../src/main/resources/app/pages/Query.tsx | 10 +-
.../main/resources/app/utils/PinotMethodUtils.ts | 16 ++-
.../core/common/datatable/DataTableImplV3.java | 36 +++---
.../core/operator/InstanceResponseOperator.java | 72 ++++++-----
.../core/query/reduce/BrokerReduceService.java | 56 +++++++-
.../pinot/core/query/scheduler/QueryScheduler.java | 22 +++-
.../core/common/datatable/DataTableSerDeTest.java | 90 ++++++++++---
.../operator/ThreadCpuTimeMeasurementTest.java | 30 +++--
.../tests/OfflineClusterIntegrationTest.java | 4 +-
17 files changed, 547 insertions(+), 155 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/RequestStatistics.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/RequestStatistics.java
index 514eb12..d95a5f5 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/RequestStatistics.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/RequestStatistics.java
@@ -45,6 +45,61 @@ public class RequestStatistics {
private long _numSegmentsMatched;
private long _offlineThreadCpuTimeNs;
private long _realtimeThreadCpuTimeNs;
+ private long _offlineSystemActivitiesCpuTimeNs;
+ private long _realtimeSystemActivitiesCpuTimeNs;
+ private long _offlineResponseSerializationCpuTimeNs;
+ private long _realtimeResponseSerializationCpuTimeNs;
+ private long _offlineTotalCpuTimeNs;
+ private long _realtimeTotalCpuTimeNs;
+
+ public long getOfflineSystemActivitiesCpuTimeNs() {
+ return _offlineSystemActivitiesCpuTimeNs;
+ }
+
+ public void setOfflineSystemActivitiesCpuTimeNs(long offlineSystemActivitiesCpuTimeNs) {
+ _offlineSystemActivitiesCpuTimeNs = offlineSystemActivitiesCpuTimeNs;
+ }
+
+ public long getRealtimeSystemActivitiesCpuTimeNs() {
+ return _realtimeSystemActivitiesCpuTimeNs;
+ }
+
+ public void setRealtimeSystemActivitiesCpuTimeNs(long realtimeSystemActivitiesCpuTimeNs) {
+ _realtimeSystemActivitiesCpuTimeNs = realtimeSystemActivitiesCpuTimeNs;
+ }
+
+ public long getOfflineResponseSerializationCpuTimeNs() {
+ return _offlineResponseSerializationCpuTimeNs;
+ }
+
+ public void setOfflineResponseSerializationCpuTimeNs(long offlineResponseSerializationCpuTimeNs) {
+ _offlineResponseSerializationCpuTimeNs = offlineResponseSerializationCpuTimeNs;
+ }
+
+ public long getOfflineTotalCpuTimeNs() {
+ return _offlineTotalCpuTimeNs;
+ }
+
+ public void setOfflineTotalCpuTimeNs(long offlineTotalCpuTimeNs) {
+ _offlineTotalCpuTimeNs = _offlineTotalCpuTimeNs;
+ }
+
+ public long getRealtimeResponseSerializationCpuTimeNs() {
+ return _realtimeResponseSerializationCpuTimeNs;
+ }
+
+ public void setRealtimeResponseSerializationCpuTimeNs(long realtimeResponseSerializationCpuTimeNs) {
+ _realtimeResponseSerializationCpuTimeNs = realtimeResponseSerializationCpuTimeNs;
+ }
+
+ public long getRealtimeTotalCpuTimeNs() {
+ return _realtimeTotalCpuTimeNs;
+ }
+
+ public void setRealtimeTotalCpuTimeNs(long realtimeTotalCpuTimeNs) {
+ _realtimeTotalCpuTimeNs = realtimeTotalCpuTimeNs;
+ }
+
private int _numServersQueried;
private int _numServersResponded;
private boolean _isNumGroupsLimitReached;
@@ -123,6 +178,12 @@ public class RequestStatistics {
_numExceptions = brokerResponse.getExceptionsSize();
_offlineThreadCpuTimeNs = brokerResponse.getOfflineThreadCpuTimeNs();
_realtimeThreadCpuTimeNs = brokerResponse.getRealtimeThreadCpuTimeNs();
+ _offlineSystemActivitiesCpuTimeNs = brokerResponse.getOfflineSystemActivitiesCpuTimeNs();
+ _realtimeSystemActivitiesCpuTimeNs = brokerResponse.getRealtimeSystemActivitiesCpuTimeNs();
+ _offlineResponseSerializationCpuTimeNs = brokerResponse.getOfflineResponseSerializationCpuTimeNs();
+ _realtimeResponseSerializationCpuTimeNs = brokerResponse.getRealtimeResponseSerializationCpuTimeNs();
+ _offlineTotalCpuTimeNs = brokerResponse.getOfflineTotalCpuTimeNs();
+ _realtimeTotalCpuTimeNs = brokerResponse.getRealtimeTotalCpuTimeNs();
_numRowsResultSet = brokerResponse.getNumRowsResultSet();
}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 547c895..f9ae5c2 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -542,7 +542,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
LOGGER.info("requestId={},table={},timeMs={},docs={}/{},entries={}/{},"
+ "segments(queried/processed/matched/consuming/unavailable):{}/{}/{}/{}/{},consumingFreshnessTimeMs={},"
+ "servers={}/{},groupLimitReached={},brokerReduceTimeMs={},exceptions={},serverStats={},"
- + "offlineThreadCpuTimeNs={},realtimeThreadCpuTimeNs={},query={}", requestId,
+ + "offlineThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{},"
+ + "realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{}," + "query={}", requestId,
brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(),
brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(),
@@ -551,8 +552,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
brokerResponse.getMinConsumingFreshnessTimeMs(), brokerResponse.getNumServersResponded(),
brokerResponse.getNumServersQueried(), brokerResponse.isNumGroupsLimitReached(),
requestStatistics.getReduceTimeMillis(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
- brokerResponse.getOfflineThreadCpuTimeNs(), brokerResponse.getRealtimeThreadCpuTimeNs(),
- StringUtils.substring(query, 0, _queryLogLength));
+ brokerResponse.getOfflineTotalCpuTimeNs(), brokerResponse.getOfflineThreadCpuTimeNs(),
+ brokerResponse.getOfflineSystemActivitiesCpuTimeNs(),
+ brokerResponse.getOfflineResponseSerializationCpuTimeNs(), brokerResponse.getRealtimeTotalCpuTimeNs(),
+ brokerResponse.getRealtimeThreadCpuTimeNs(), brokerResponse.getRealtimeSystemActivitiesCpuTimeNs(),
+ brokerResponse.getRealtimeResponseSerializationCpuTimeNs(), StringUtils.substring(query, 0, _queryLogLength));
// Limit the dropping log message at most once per second.
if (_numDroppedLogRateLimiter.tryAcquire()) {
@@ -836,8 +840,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
// Table name might have been changed (with suffix _OFFLINE/_REALTIME appended)
LOGGER.info("requestId={},table={},timeMs={},docs={}/{},entries={}/{},"
+ "segments(queried/processed/matched/consuming/unavailable):{}/{}/{}/{}/{},consumingFreshnessTimeMs={},"
- + "servers={}/{},groupLimitReached={},brokerReduceTimeMs={},exceptions={},serverStats={},query={},"
- + "offlineThreadCpuTimeNs={},realtimeThreadCpuTimeNs={}", requestId,
+ + "servers={}/{},groupLimitReached={},brokerReduceTimeMs={},exceptions={},serverStats={},"
+ + "offlineThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{},"
+ + "realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{}," + "query={}", requestId,
brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(),
brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(),
brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(),
@@ -846,8 +851,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
brokerResponse.getMinConsumingFreshnessTimeMs(), brokerResponse.getNumServersResponded(),
brokerResponse.getNumServersQueried(), brokerResponse.isNumGroupsLimitReached(),
requestStatistics.getReduceTimeMillis(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
- StringUtils.substring(query, 0, _queryLogLength), brokerResponse.getOfflineThreadCpuTimeNs(),
- brokerResponse.getRealtimeThreadCpuTimeNs());
+ brokerResponse.getOfflineTotalCpuTimeNs(), brokerResponse.getOfflineThreadCpuTimeNs(),
+ brokerResponse.getOfflineSystemActivitiesCpuTimeNs(),
+ brokerResponse.getOfflineResponseSerializationCpuTimeNs(), brokerResponse.getRealtimeTotalCpuTimeNs(),
+ brokerResponse.getRealtimeThreadCpuTimeNs(), brokerResponse.getRealtimeSystemActivitiesCpuTimeNs(),
+ brokerResponse.getRealtimeResponseSerializationCpuTimeNs(), StringUtils.substring(query, 0, _queryLogLength));
// Limit the dropping log message at most once per second.
if (_numDroppedLogRateLimiter.tryAcquire()) {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
index 8bc8410..661e42a 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java
@@ -33,10 +33,24 @@ public enum BrokerTimer implements AbstractMetrics.Timer {
// The latency of sending the request from broker to server
NETTY_CONNECTION_SEND_REQUEST_LATENCY(false),
- // aggregated query processing cost (thread cpu time in nanoseconds) from offline servers
- OFFLINE_THREAD_CPU_TIME_NS(
- false), // aggregated query processing cost (thread cpu time in nanoseconds) from realtime servers
- REALTIME_THREAD_CPU_TIME_NS(false);
+ // aggregated thread cpu time in nanoseconds for query processing from offline servers
+ OFFLINE_THREAD_CPU_TIME_NS(false),
+ // aggregated thread cpu time in nanoseconds for query processing from realtime servers
+ REALTIME_THREAD_CPU_TIME_NS(false),
+ // aggregated system activities cpu time in nanoseconds for query processing from offline servers
+ OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS(false),
+ // aggregated system activities cpu time in nanoseconds for query processing from realtime servers
+ REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS(false),
+ // aggregated response serialization cpu time in nanoseconds for query processing from offline servers
+ OFFLINE_RESPONSE_SER_CPU_TIME_NS(false),
+ // aggregated response serialization cpu time in nanoseconds for query processing from realtime servers
+ REALTIME_RESPONSE_SER_CPU_TIME_NS(false),
+ // aggregated total cpu time(thread + system activities + response serialization) in nanoseconds for query
+ // processing from offline servers
+ OFFLINE_TOTAL_CPU_TIME_NS(false),
+ // aggregated total cpu time(thread + system activities + response serialization) in nanoseconds for query
+ // processing from realtime servers
+ REALTIME_TOTAL_CPU_TIME_NS(false);
private final String _timerName;
private final boolean _global;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index c1e1615..b6d83bc 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -32,8 +32,18 @@ public enum ServerTimer implements AbstractMetrics.Timer {
// The latency of sending the response from server to broker
NETTY_CONNECTION_SEND_RESPONSE_LATENCY("nettyConnection", false),
- // Query cost (thread cpu time) for query processing on server
- EXECUTION_THREAD_CPU_TIME_NS("nanoseconds", false);
+ // Query cost (execution thread cpu time) for query processing on server
+ EXECUTION_THREAD_CPU_TIME_NS("nanoseconds", false),
+
+ // Query cost (system activities cpu time) for query processing on server
+ SYSTEM_ACTIVITIES_CPU_TIME_NS("nanoseconds", false),
+
+ // Query cost (response serialization cpu time) for query processing on server
+ RESPONSE_SER_CPU_TIME_NS("nanoseconds", false),
+
+ // Total query cost (thread cpu time + system activities cpu time + response serialization cpu time) for query
+ // processing on server
+ TOTAL_CPU_TIME_NS("nanoseconds", false);
private final String _timerName;
private final boolean _global;
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
index a196668..5119f02 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java
@@ -52,16 +52,6 @@ public interface BrokerResponse {
void setTimeUsedMs(long timeUsedMs);
/**
- * Set the total thread cpu time used against realtime table in request handling, into the broker response.
- */
- void setRealtimeThreadCpuTimeNs(long realtimeThreadCpuTimeNs);
-
- /**
- * Set the total thread cpu time used against offline table in request handling, into the broker response.
- */
- void setOfflineThreadCpuTimeNs(long offlineThreadCpuTimeNs);
-
- /**
* Set the total number of rows in result set
*/
void setNumRowsResultSet(int numRowsResultSet);
@@ -143,17 +133,91 @@ public interface BrokerResponse {
List<QueryProcessingException> getProcessingExceptions();
/**
- * Get the total thread cpu time used against realtime table in request handling, into the broker response.
+ * Get the total number of rows in result set
*/
- long getRealtimeThreadCpuTimeNs();
+ int getNumRowsResultSet();
/**
- * Get the total thread cpu time used against offline table in request handling, into the broker response.
+ * Set the total thread cpu time used against offline table in request handling, into the broker response.
+ */
+ void setOfflineThreadCpuTimeNs(long offlineThreadCpuTimeNs);
+
+ /**
+ * Get the thread cpu time used against offline table in request handling, from the broker response.
*/
long getOfflineThreadCpuTimeNs();
/**
- * Get the total number of rows in result set
+ * Get the thread cpu time used against realtime table in request handling, from the broker response.
*/
- int getNumRowsResultSet();
+ long getRealtimeThreadCpuTimeNs();
+
+ /**
+ * Set the total thread cpu time used against realtime table in request handling, into the broker response.
+ */
+ void setRealtimeThreadCpuTimeNs(long realtimeThreadCpuTimeNs);
+
+ /**
+ * Get the system activities cpu time used against offline table in request handling, from the broker response.
+ */
+ long getOfflineSystemActivitiesCpuTimeNs();
+
+ /**
+ * Set the system activities cpu time used against offline table in request handling, into the broker response.
+ */
+ void setOfflineSystemActivitiesCpuTimeNs(long offlineSystemActivitiesCpuTimeNs);
+
+ /**
+ * Get the system activities cpu time used against realtime table in request handling, from the broker response.
+ */
+ long getRealtimeSystemActivitiesCpuTimeNs();
+
+ /**
+ * Set the system activities cpu time used against realtime table in request handling, into the broker response.
+ */
+ void setRealtimeSystemActivitiesCpuTimeNs(long realtimeSystemActivitiesCpuTimeNs);
+
+ /**
+ * Get the response serialization cpu time used against offline table in request handling, from the broker response.
+ */
+ long getOfflineResponseSerializationCpuTimeNs();
+
+ /**
+ * Set the response serialization cpu time used against offline table in request handling, into the broker response.
+ */
+ void setOfflineResponseSerializationCpuTimeNs(long offlineResponseSerializationCpuTimeNs);
+
+ /**
+ * Get the response serialization cpu time used against realtime table in request handling, from the broker response.
+ */
+ long getRealtimeResponseSerializationCpuTimeNs();
+
+ /**
+ * Set the response serialization cpu time used against realtime table in request handling, into the broker response.
+ */
+ void setRealtimeResponseSerializationCpuTimeNs(long realtimeResponseSerializationCpuTimeNs);
+
+ /**
+ * Get the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used
+ * against offline table in request handling, from the broker response.
+ */
+ long getOfflineTotalCpuTimeNs();
+
+ /**
+ * Set the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used
+ * against offline table in request handling, into the broker response.
+ */
+ void setOfflineTotalCpuTimeNs(long offlineTotalCpuTimeNs);
+
+ /**
+ * Get the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used
+ * against realtime table in request handling, from the broker response.
+ */
+ long getRealtimeTotalCpuTimeNs();
+
+ /**
+ * Set the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used
+ * against realtime table in request handling, into the broker response.
+ */
+ void setRealtimeTotalCpuTimeNs(long realtimeTotalCpuTimeNs);
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index c6fc91c..665fb29 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -43,7 +43,10 @@ import org.apache.pinot.spi.utils.JsonUtils;
"selectionResults", "aggregationResults", "resultTable", "exceptions", "numServersQueried", "numServersResponded",
"numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numDocsScanned",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs",
- "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "segmentStatistics", "traceInfo"
+ "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs",
+ "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs",
+ "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "segmentStatistics",
+ "traceInfo"
})
public class BrokerResponseNative implements BrokerResponse {
public static final BrokerResponseNative EMPTY_RESULT = BrokerResponseNative.empty();
@@ -70,12 +73,16 @@ public class BrokerResponseNative implements BrokerResponse {
private long _timeUsedMs = 0L;
private long _offlineThreadCpuTimeNs = 0L;
private long _realtimeThreadCpuTimeNs = 0L;
+ private long _offlineSystemActivitiesCpuTimeNs = 0L;
+ private long _realtimeSystemActivitiesCpuTimeNs = 0L;
+ private long _offlineResponseSerializationCpuTimeNs = 0L;
+ private long _realtimeResponseSerializationCpuTimeNs = 0L;
+ private long _offlineTotalCpuTimeNs = 0L;
+ private long _realtimeTotalCpuTimeNs = 0L;
private int _numRowsResultSet = 0;
-
private SelectionResults _selectionResults;
private List<AggregationResult> _aggregationResults;
private ResultTable _resultTable;
-
private Map<String, String> _traceInfo = new HashMap<>();
private List<QueryProcessingException> _processingExceptions = new ArrayList<>();
private List<String> _segmentStatistics = new ArrayList<>();
@@ -100,6 +107,107 @@ public class BrokerResponseNative implements BrokerResponse {
return new BrokerResponseNative();
}
+ public static BrokerResponseNative fromJsonString(String jsonString)
+ throws IOException {
+ return JsonUtils.stringToObject(jsonString, BrokerResponseNative.class);
+ }
+
+ @JsonProperty("offlineSystemActivitiesCpuTimeNs")
+ @Override
+ public long getOfflineSystemActivitiesCpuTimeNs() {
+ return _offlineSystemActivitiesCpuTimeNs;
+ }
+
+ @JsonProperty("offlineSystemActivitiesCpuTimeNs")
+ @Override
+ public void setOfflineSystemActivitiesCpuTimeNs(long offlineSystemActivitiesCpuTimeNs) {
+ _offlineSystemActivitiesCpuTimeNs = offlineSystemActivitiesCpuTimeNs;
+ }
+
+ @JsonProperty("realtimeSystemActivitiesCpuTimeNs")
+ @Override
+ public long getRealtimeSystemActivitiesCpuTimeNs() {
+ return _realtimeSystemActivitiesCpuTimeNs;
+ }
+
+ @JsonProperty("realtimeSystemActivitiesCpuTimeNs")
+ @Override
+ public void setRealtimeSystemActivitiesCpuTimeNs(long realtimeSystemActivitiesCpuTimeNs) {
+ _realtimeSystemActivitiesCpuTimeNs = realtimeSystemActivitiesCpuTimeNs;
+ }
+
+ @JsonProperty("offlineThreadCpuTimeNs")
+ @Override
+ public long getOfflineThreadCpuTimeNs() {
+ return _offlineThreadCpuTimeNs;
+ }
+
+ @JsonProperty("offlineThreadCpuTimeNs")
+ @Override
+ public void setOfflineThreadCpuTimeNs(long timeUsedMs) {
+ _offlineThreadCpuTimeNs = timeUsedMs;
+ }
+
+ @JsonProperty("realtimeThreadCpuTimeNs")
+ @Override
+ public long getRealtimeThreadCpuTimeNs() {
+ return _realtimeThreadCpuTimeNs;
+ }
+
+ @JsonProperty("realtimeThreadCpuTimeNs")
+ @Override
+ public void setRealtimeThreadCpuTimeNs(long timeUsedMs) {
+ _realtimeThreadCpuTimeNs = timeUsedMs;
+ }
+
+ @JsonProperty("offlineResponseSerializationCpuTimeNs")
+ @Override
+ public long getOfflineResponseSerializationCpuTimeNs() {
+ return _offlineResponseSerializationCpuTimeNs;
+ }
+
+ @JsonProperty("offlineResponseSerializationCpuTimeNs")
+ @Override
+ public void setOfflineResponseSerializationCpuTimeNs(long offlineResponseSerializationCpuTimeNs) {
+ _offlineResponseSerializationCpuTimeNs = offlineResponseSerializationCpuTimeNs;
+ }
+
+ @JsonProperty("realtimeResponseSerializationCpuTimeNs")
+ @Override
+ public long getRealtimeResponseSerializationCpuTimeNs() {
+ return _realtimeResponseSerializationCpuTimeNs;
+ }
+
+ @JsonProperty("realtimeResponseSerializationCpuTimeNs")
+ @Override
+ public void setRealtimeResponseSerializationCpuTimeNs(long realtimeResponseSerializationCpuTimeNs) {
+ _realtimeResponseSerializationCpuTimeNs = realtimeResponseSerializationCpuTimeNs;
+ }
+
+ @JsonProperty("offlineTotalCpuTimeNs")
+ @Override
+ public long getOfflineTotalCpuTimeNs() {
+ return _offlineTotalCpuTimeNs;
+ }
+
+ @JsonProperty("offlineTotalCpuTimeNs")
+ @Override
+ public void setOfflineTotalCpuTimeNs(long offlineTotalCpuTimeNs) {
+ _offlineTotalCpuTimeNs = offlineTotalCpuTimeNs;
+ }
+
+ @JsonProperty("realtimeTotalCpuTimeNs")
+ @Override
+ public long getRealtimeTotalCpuTimeNs() {
+ return _realtimeTotalCpuTimeNs;
+ }
+
+ @JsonProperty("realtimeTotalCpuTimeNs")
+ @Override
+ public void setRealtimeTotalCpuTimeNs(long realtimeTotalCpuTimeNs) {
+ _realtimeTotalCpuTimeNs = realtimeTotalCpuTimeNs;
+ }
+
@JsonProperty("selectionResults")
@JsonInclude(JsonInclude.Include.NON_NULL)
public SelectionResults getSelectionResults() {
@@ -288,42 +396,18 @@ public class BrokerResponseNative implements BrokerResponse {
_timeUsedMs = timeUsedMs;
}
- @JsonProperty("offlineThreadCpuTimeNs")
- @Override
- public long getOfflineThreadCpuTimeNs() {
- return _offlineThreadCpuTimeNs;
- }
-
@JsonProperty("numRowsResultSet")
@Override
public int getNumRowsResultSet() {
return _numRowsResultSet;
}
- @JsonProperty("offlineThreadCpuTimeNs")
- @Override
- public void setOfflineThreadCpuTimeNs(long timeUsedMs) {
- _offlineThreadCpuTimeNs = timeUsedMs;
- }
-
@JsonProperty("numRowsResultSet")
@Override
public void setNumRowsResultSet(int numRowsResultSet) {
_numRowsResultSet = numRowsResultSet;
}
- @JsonProperty("realtimeThreadCpuTimeNs")
- @Override
- public long getRealtimeThreadCpuTimeNs() {
- return _realtimeThreadCpuTimeNs;
- }
-
- @JsonProperty("realtimeThreadCpuTimeNs")
- @Override
- public void setRealtimeThreadCpuTimeNs(long timeUsedMs) {
- _realtimeThreadCpuTimeNs = timeUsedMs;
- }
-
@JsonProperty("segmentStatistics")
public List<String> getSegmentStatistics() {
return _segmentStatistics;
@@ -350,11 +434,6 @@ public class BrokerResponseNative implements BrokerResponse {
return JsonUtils.objectToString(this);
}
- public static BrokerResponseNative fromJsonString(String jsonString)
- throws IOException {
- return JsonUtils.stringToObject(jsonString, BrokerResponseNative.class);
- }
-
@JsonIgnore
@Override
public void setExceptions(List<ProcessingException> exceptions) {
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 8e4abd8..9bea554 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -99,7 +99,9 @@ public interface DataTable {
REQUEST_ID("requestId", MetadataValueType.LONG),
NUM_RESIZES("numResizes", MetadataValueType.INT),
RESIZE_TIME_MS("resizeTimeMs", MetadataValueType.LONG),
- THREAD_CPU_TIME_NS("threadCpuTimeNs", MetadataValueType.LONG);
+ THREAD_CPU_TIME_NS("threadCpuTimeNs", MetadataValueType.LONG),
+ SYSTEM_ACTIVITIES_CPU_TIME_NS("systemActivitiesCpuTimeNs", MetadataValueType.LONG),
+ RESPONSE_SER_CPU_TIME_NS("responseSerializationCpuTimeNs", MetadataValueType.LONG);
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>();
private final String _name;
diff --git a/pinot-controller/src/main/resources/app/interfaces/types.d.ts b/pinot-controller/src/main/resources/app/interfaces/types.d.ts
index f79cc24..a7a446a 100644
--- a/pinot-controller/src/main/resources/app/interfaces/types.d.ts
+++ b/pinot-controller/src/main/resources/app/interfaces/types.d.ts
@@ -121,6 +121,12 @@ declare module 'Models' {
minConsumingFreshnessTimeMs: number
offlineThreadCpuTimeNs: number
realtimeThreadCpuTimeNs: number
+ offlineSystemActivitiesCpuTimeNs: number
+ realtimeSystemActivitiesCpuTimeNs: number
+ offlineResponseSerializationCpuTimeNs: number
+ realtimeResponseSerializationCpuTimeNs: number
+ offlineTotalCpuTimeNs: number
+ realtimeTotalCpuTimeNs: number
};
export type ClusterName = {
diff --git a/pinot-controller/src/main/resources/app/pages/Query.tsx b/pinot-controller/src/main/resources/app/pages/Query.tsx
index 77f658e..b5fd747 100644
--- a/pinot-controller/src/main/resources/app/pages/Query.tsx
+++ b/pinot-controller/src/main/resources/app/pages/Query.tsx
@@ -138,7 +138,13 @@ const responseStatCols = [
'partialResponse',
'minConsumingFreshnessTimeMs',
'offlineThreadCpuTimeNs',
- 'realtimeThreadCpuTimeNs'
+ 'realtimeThreadCpuTimeNs',
+ 'offlineSystemActivitiesCpuTimeNs',
+ 'realtimeSystemActivitiesCpuTimeNs',
+ 'offlineResponseSerializationCpuTimeNs',
+ 'realtimeResponseSerializationCpuTimeNs',
+ 'offlineTotalCpuTimeNs',
+ 'realtimeTotalCpuTimeNs'
];
const QueryPage = () => {
@@ -189,7 +195,7 @@ const QueryPage = () => {
const handleOutputDataChange = (editor, data, value) => {
setInputQuery(value);
};
-
+
const handleQueryInterfaceKeyDown = (editor, event) => {
// Map Cmd + Enter KeyPress to executing the query
if (event.metaKey == true && event.keyCode == 13) {
diff --git a/pinot-controller/src/main/resources/app/utils/PinotMethodUtils.ts b/pinot-controller/src/main/resources/app/utils/PinotMethodUtils.ts
index 468852d..8634a11 100644
--- a/pinot-controller/src/main/resources/app/utils/PinotMethodUtils.ts
+++ b/pinot-controller/src/main/resources/app/utils/PinotMethodUtils.ts
@@ -185,7 +185,7 @@ const getClusterConfigJSON = () => {
// Expected Output: {columns: [], records: []}
const getQueryTablesList = ({bothType = false}) => {
const promiseArr = bothType ? [getQueryTables('realtime'), getQueryTables('offline')] : [getQueryTables()];
-
+
return Promise.all(promiseArr).then((results) => {
const responseObj = {
columns: ['Tables'],
@@ -295,7 +295,14 @@ const getQueryResults = (params, url, checkedOptions) => {
'partialResponse',
'minConsumingFreshnessTimeMs',
'offlineThreadCpuTimeNs',
- 'realtimeThreadCpuTimeNs'];
+ 'realtimeThreadCpuTimeNs',
+ 'offlineSystemActivitiesCpuTimeNs',
+ 'realtimeSystemActivitiesCpuTimeNs',
+ 'offlineResponseSerializationCpuTimeNs',
+ 'realtimeResponseSerializationCpuTimeNs',
+ 'offlineTotalCpuTimeNs',
+ 'realtimeTotalCpuTimeNs'
+ ];
return {
result: {
@@ -308,7 +315,10 @@ const getQueryResults = (params, url, checkedOptions) => {
queryResponse.numSegmentsQueried, queryResponse.numSegmentsProcessed, queryResponse.numSegmentsMatched, queryResponse.numConsumingSegmentsQueried,
queryResponse.numEntriesScannedInFilter, queryResponse.numEntriesScannedPostFilter, queryResponse.numGroupsLimitReached,
queryResponse.partialResponse ? queryResponse.partialResponse : '-', queryResponse.minConsumingFreshnessTimeMs,
- queryResponse.offlineThreadCpuTimeNs, queryResponse.realtimeThreadCpuTimeNs]]
+ queryResponse.offlineThreadCpuTimeNs, queryResponse.realtimeThreadCpuTimeNs,
+ queryResponse.offlineSystemActivitiesCpuTimeNs, queryResponse.realtimeSystemActivitiesCpuTimeNs,
+ queryResponse.offlineResponseSerializationCpuTimeNs, queryResponse.realtimeResponseSerializationCpuTimeNs,
+ queryResponse.offlineTotalCpuTimeNs, queryResponse.realtimeTotalCpuTimeNs]]
},
data: queryResponse,
};
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
index f886075..930c1dd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
@@ -192,6 +192,26 @@ public class DataTableImplV3 extends BaseDataTable {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+ writeLeadingSections(dataOutputStream);
+
+ // Add table serialization time metadata if thread timer is enabled.
+ if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+ long responseSerializationCpuTimeNs = threadTimer.stopAndGetThreadTimeNs();
+ getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs));
+ }
+
+ // Write metadata: length followed by actual metadata bytes.
+ // NOTE: We ignore metadata serialization time in "responseSerializationCpuTimeNs" as it's negligible while
+ // considering it will bring a lot code complexity.
+ byte[] metadataBytes = serializeMetadata();
+ dataOutputStream.writeInt(metadataBytes.length);
+ dataOutputStream.write(metadataBytes);
+
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ private void writeLeadingSections(DataOutputStream dataOutputStream)
+ throws IOException {
dataOutputStream.writeInt(DataTableBuilder.VERSION_3);
dataOutputStream.writeInt(_numRows);
dataOutputStream.writeInt(_numColumns);
@@ -262,22 +282,6 @@ public class DataTableImplV3 extends BaseDataTable {
if (_variableSizeDataBytes != null) {
dataOutputStream.write(_variableSizeDataBytes);
}
-
- // Update the value of "threadCpuTimeNs" to account data table serialization time.
- long responseSerializationCpuTimeNs = threadTimer.stopAndGetThreadTimeNs();
- // TODO: currently log/emit a total thread cpu time for query execution time and data table serialization time.
- // Figure out a way to log/emit separately. Probably via providing an API on the DataTable to get/set query
- // context, which is supposed to be used at server side only.
- long threadCpuTimeNs = Long.parseLong(getMetadata().getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(), "0"))
- + responseSerializationCpuTimeNs;
- getMetadata().put(MetadataKey.THREAD_CPU_TIME_NS.getName(), String.valueOf(threadCpuTimeNs));
-
- // Write metadata: length followed by actual metadata bytes.
- byte[] metadataBytes = serializeMetadata();
- dataOutputStream.writeInt(metadataBytes.length);
- dataOutputStream.write(metadataBytes);
-
- return byteArrayOutputStream.toByteArray();
}
/**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
index a60ce70..a6b8630 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.operator;
import java.util.List;
+import java.util.Map;
import org.apache.pinot.common.utils.DataTable.MetadataKey;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
@@ -44,14 +45,40 @@ public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock
_fetchContextSize = fetchContexts.size();
}
+ /*
+ * Derive systemActivitiesCpuTimeNs from totalWallClockTimeNs, multipleThreadCpuTimeNs, singleThreadCpuTimeNs,
+ * and numServerThreads.
+ *
+ * For example, let's divide query processing into 4 phases:
+ * - phase 1: single thread (main thread) preparing. Time used: T1
+ * - phase 2: N threads processing segments in parallel, each thread use time T2
+ * - phase 3: system activities (GC/OS paging). Time used: T3
+ * - phase 4: single thread (main thread) merging intermediate results blocks. Time used: T4
+ *
+ * Then we have following equations:
+ * - mainThreadCpuTimeNs = T1 + T4
+ * - multipleThreadCpuTimeNs = T2 * N
+ * - totalWallClockTimeNs = T1 + T2 + T3 + T4 = mainThreadCpuTimeNs + T2 + T3
+ * - systemActivitiesCpuTimeNs = T3 = totalWallClockTimeNs - mainThreadCpuTimeNs - T2
+ */
+ public static long calSystemActivitiesCpuTimeNs(long totalWallClockTimeNs, long multipleThreadCpuTimeNs,
+ long mainThreadCpuTimeNs, int numServerThreads) {
+ double perMultipleThreadCpuTimeNs = multipleThreadCpuTimeNs * 1.0 / numServerThreads;
+ double systemActivitiesCpuTimeNs = (totalWallClockTimeNs - mainThreadCpuTimeNs - perMultipleThreadCpuTimeNs);
+ return Math.round(systemActivitiesCpuTimeNs);
+ }
+
@Override
protected InstanceResponseBlock getNextBlock() {
if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+ ThreadTimer mainThreadTimer = new ThreadTimer();
+ mainThreadTimer.start();
+
long startWallClockTimeNs = System.nanoTime();
IntermediateResultsBlock intermediateResultsBlock = getCombinedResults();
InstanceResponseBlock instanceResponseBlock = new InstanceResponseBlock(intermediateResultsBlock);
long totalWallClockTimeNs = System.nanoTime() - startWallClockTimeNs;
-
+ long mainThreadCpuTimeNs = mainThreadTimer.stopAndGetThreadTimeNs();
/*
* If/when the threadCpuTime based instrumentation is done for other parts of execution (planning, pruning etc),
* we will have to change the wallClockTime computation accordingly. Right now everything under
@@ -59,11 +86,16 @@ public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock
*/
long multipleThreadCpuTimeNs = intermediateResultsBlock.getExecutionThreadCpuTimeNs();
int numServerThreads = intermediateResultsBlock.getNumServerThreads();
- long totalThreadCpuTimeNs =
- calTotalThreadCpuTimeNs(totalWallClockTimeNs, multipleThreadCpuTimeNs, numServerThreads);
+ long systemActivitiesCpuTimeNs =
+ calSystemActivitiesCpuTimeNs(totalWallClockTimeNs, multipleThreadCpuTimeNs, mainThreadCpuTimeNs,
+ numServerThreads);
+
+ long threadCpuTimeNs = mainThreadCpuTimeNs + multipleThreadCpuTimeNs;
+ Map<String, String> responseMetaData = instanceResponseBlock.getInstanceResponseDataTable().getMetadata();
+ responseMetaData.put(MetadataKey.THREAD_CPU_TIME_NS.getName(), String.valueOf(threadCpuTimeNs));
+ responseMetaData
+ .put(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), String.valueOf(systemActivitiesCpuTimeNs));
- instanceResponseBlock.getInstanceResponseDataTable().getMetadata()
- .put(MetadataKey.THREAD_CPU_TIME_NS.getName(), String.valueOf(totalThreadCpuTimeNs));
return instanceResponseBlock;
} else {
return new InstanceResponseBlock(getCombinedResults());
@@ -79,36 +111,6 @@ public class InstanceResponseOperator extends BaseOperator<InstanceResponseBlock
}
}
- /*
- * Calculate totalThreadCpuTimeNs based on totalWallClockTimeNs, multipleThreadCpuTimeNs, and numServerThreads.
- * System activities time such as OS paging, GC, context switching are not captured by totalThreadCpuTimeNs.
- * For example, let's divide query processing into 4 phases:
- * - phase 1: single thread preparing. Time used: T1
- * - phase 2: N threads processing segments in parallel, each thread use time T2
- * - phase 3: GC/OS paging. Time used: T3
- * - phase 4: single thread merging intermediate results blocks. Time used: T4
- *
- * Then we have following equations:
- * - singleThreadCpuTimeNs = T1 + T4
- * - multipleThreadCpuTimeNs = T2 * N
- * - totalWallClockTimeNs = T1 + T2 + T3 + T4 = singleThreadCpuTimeNs + T2 + T3
- * - totalThreadCpuTimeNsWithoutSystemActivities = T1 + T2 * N + T4 = singleThreadCpuTimeNs + T2 * N
- * - systemActivitiesTimeNs = T3 = (totalWallClockTimeNs - totalThreadCpuTimeNsWithoutSystemActivities) + T2 * (N - 1)
- *
- * Thus:
- * totalThreadCpuTimeNsWithSystemActivities = totalThreadCpuTimeNsWithoutSystemActivities + systemActivitiesTimeNs
- * = totalThreadCpuTimeNsWithoutSystemActivities + T3
- * = totalThreadCpuTimeNsWithoutSystemActivities + (totalWallClockTimeNs -
- * totalThreadCpuTimeNsWithoutSystemActivities) + T2 * (N - 1)
- * = totalWallClockTimeNs + T2 * (N - 1)
- * = totalWallClockTimeNs + (multipleThreadCpuTimeNs / N) * (N - 1)
- */
- public static long calTotalThreadCpuTimeNs(long totalWallClockTimeNs, long multipleThreadCpuTimeNs,
- int numServerThreads) {
- double perThreadCpuTimeNs = multipleThreadCpuTimeNs * 1.0 / numServerThreads;
- return Math.round(totalWallClockTimeNs + perThreadCpuTimeNs * (numServerThreads - 1));
- }
-
@Override
public String getOperatorName() {
return OPERATOR_NAME;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 77aae3d..fa22e54 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -140,6 +140,13 @@ public class BrokerReduceService {
long numTotalDocs = 0L;
long offlineThreadCpuTimeNs = 0L;
long realtimeThreadCpuTimeNs = 0L;
+ long offlineSystemActivitiesCpuTimeNs = 0L;
+ long realtimeSystemActivitiesCpuTimeNs = 0L;
+ long offlineResponseSerializationCpuTimeNs = 0L;
+ long realtimeResponseSerializationCpuTimeNs = 0L;
+ long offlineTotalCpuTimeNs = 0L;
+ long realtimeTotalCpuTimeNs = 0L;
+
boolean numGroupsLimitReached = false;
PinotQuery pinotQuery = brokerRequest.getPinotQuery();
@@ -217,6 +224,28 @@ public class BrokerReduceService {
}
}
+ String systemActivitiesCpuTimeNsString = metadata.get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName());
+ if (systemActivitiesCpuTimeNsString != null) {
+ if (entry.getKey().getTableType() == TableType.OFFLINE) {
+ offlineSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
+ } else {
+ realtimeSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString);
+ }
+ }
+
+ String responseSerializationCpuTimeNsString = metadata.get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
+ if (responseSerializationCpuTimeNsString != null) {
+ if (entry.getKey().getTableType() == TableType.OFFLINE) {
+ offlineResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
+ } else {
+ realtimeResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString);
+ }
+ }
+ offlineTotalCpuTimeNs =
+ offlineThreadCpuTimeNs + offlineSystemActivitiesCpuTimeNs + offlineResponseSerializationCpuTimeNs;
+ realtimeTotalCpuTimeNs =
+ realtimeThreadCpuTimeNs + realtimeSystemActivitiesCpuTimeNs + realtimeResponseSerializationCpuTimeNs;
+
String numTotalDocsString = metadata.get(MetadataKey.TOTAL_DOCS.getName());
if (numTotalDocsString != null) {
numTotalDocs += Long.parseLong(numTotalDocsString);
@@ -251,6 +280,12 @@ public class BrokerReduceService {
brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached);
brokerResponseNative.setOfflineThreadCpuTimeNs(offlineThreadCpuTimeNs);
brokerResponseNative.setRealtimeThreadCpuTimeNs(realtimeThreadCpuTimeNs);
+ brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(offlineSystemActivitiesCpuTimeNs);
+ brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(realtimeSystemActivitiesCpuTimeNs);
+ brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(offlineResponseSerializationCpuTimeNs);
+ brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(realtimeResponseSerializationCpuTimeNs);
+ brokerResponseNative.setOfflineTotalCpuTimeNs(offlineTotalCpuTimeNs);
+ brokerResponseNative.setRealtimeTotalCpuTimeNs(realtimeTotalCpuTimeNs);
if (numConsumingSegmentsProcessed > 0) {
brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsProcessed);
brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs);
@@ -261,14 +296,27 @@ public class BrokerReduceService {
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
if (brokerMetrics != null) {
brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.DOCUMENTS_SCANNED, numDocsScanned);
- brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER,
- numEntriesScannedInFilter);
- brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER,
- numEntriesScannedPostFilter);
+ brokerMetrics
+ .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter);
+ brokerMetrics
+ .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_THREAD_CPU_TIME_NS, offlineThreadCpuTimeNs,
TimeUnit.NANOSECONDS);
brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_THREAD_CPU_TIME_NS, realtimeThreadCpuTimeNs,
TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS,
+ offlineSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS,
+ realtimeSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_RESPONSE_SER_CPU_TIME_NS,
+ offlineResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_RESPONSE_SER_CPU_TIME_NS,
+ realtimeResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_TOTAL_CPU_TIME_NS, offlineTotalCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+ brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, realtimeTotalCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+
if (numConsumingSegmentsProcessed > 0 && minConsumingFreshnessTimeMs > 0) {
brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS,
System.currentTimeMillis() - minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 3ed9b90..07cc18b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -188,6 +188,11 @@ public abstract class QueryScheduler {
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(), INVALID_RESIZE_TIME_MS));
long threadCpuTimeNs =
Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(), "0"));
+ long systemActivitiesCpuTimeNs =
+ Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), "0"));
+ long responseSerializationCpuTimeNs =
+ Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), "0"));
+ long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs + responseSerializationCpuTimeNs;
if (numDocsScanned > 0) {
_serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned);
@@ -210,6 +215,18 @@ public abstract class QueryScheduler {
_serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.EXECUTION_THREAD_CPU_TIME_NS, threadCpuTimeNs,
TimeUnit.NANOSECONDS);
}
+ if (systemActivitiesCpuTimeNs > 0) {
+ _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.SYSTEM_ACTIVITIES_CPU_TIME_NS,
+ systemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS);
+ }
+ if (responseSerializationCpuTimeNs > 0) {
+ _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.RESPONSE_SER_CPU_TIME_NS,
+ responseSerializationCpuTimeNs, TimeUnit.NANOSECONDS);
+ }
+ if (totalCpuTimeNs > 0) {
+ _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.TOTAL_CPU_TIME_NS, totalCpuTimeNs,
+ TimeUnit.NANOSECONDS);
+ }
TimerContext timerContext = queryRequest.getTimerContext();
int numSegmentsQueried = queryRequest.getSegmentsToQuery().size();
@@ -221,14 +238,15 @@ public abstract class QueryScheduler {
LOGGER.info("Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{},"
+ "schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={},"
+ "minConsumingFreshnessMs={},broker={},"
- + "numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},threadCpuTimeNs={}", requestId,
+ + "numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},"
+ + "threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId,
tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming,
schedulerWaitMs, timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION),
timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING),
timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION),
timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs,
queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name(),
- threadCpuTimeNs);
+ totalCpuTimeNs, threadCpuTimeNs, systemActivitiesCpuTimeNs, responseSerializationCpuTimeNs);
// Limit the dropping log message at most once per second.
if (_numDroppedLogRateLimiter.tryAcquire()) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
index 9dc06b2..00b86488 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java
@@ -202,7 +202,9 @@ public class DataTableSerDeTest {
Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
- // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server.
+ // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server(with ThreadCpuTimeMeasurement
+ // disabled)
+ ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
DataTableBuilder dataTableBuilderV3WithDataOnly = new DataTableBuilder(dataSchema);
fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, columnDataTypes, numColumns);
@@ -212,22 +214,22 @@ public class DataTableSerDeTest {
Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- // DataTable V3 serialization logic will add an extra THREAD_CPU_TIME_NS KV pair into metadata
- Assert.assertEquals(newDataTable.getMetadata().size(), 1);
- Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.THREAD_CPU_TIME_NS.getName()));
+ Assert.assertEquals(newDataTable.getMetadata().size(), 0);
- // Verify V3 broker can deserialize data table (has data and metadata) send by V3 server
+ // Verify V3 broker can deserialize data table (has data and metadata) send by V3 server(with
+ // ThreadCpuTimeMeasurement disabled)
for (String key : EXPECTED_METADATA.keySet()) {
dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
}
+ // Deserialize data table bytes as V3
newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3
Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
- newDataTable.getMetadata().remove(MetadataKey.THREAD_CPU_TIME_NS.getName());
Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
- // Verify V3 broker can deserialize data table (only has metadata) send by V3 server
+ // Verify V3 broker can deserialize data table (only has metadata) send by V3 server(with
+ // ThreadCpuTimeMeasurement disabled)
DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = new DataTableBuilder(dataSchema);
dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V3 data table
for (String key : EXPECTED_METADATA.keySet()) {
@@ -236,7 +238,50 @@ public class DataTableSerDeTest {
newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3
Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
- newDataTable.getMetadata().remove(MetadataKey.THREAD_CPU_TIME_NS.getName());
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+ // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server(with ThreadCpuTimeMeasurement
+ // enabled)
+ ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
+ DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3);
+ dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table
+ // Deserialize data table bytes as V3
+ newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes());
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+ Assert.assertEquals(newDataTable.getMetadata().size(), 1);
+ Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
+
+ // Verify V3 broker can deserialize data table (has data and metadata) send by V3 server(with
+ // ThreadCpuTimeMeasurement enabled)
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ // Deserialize data table bytes as V3
+ newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE);
+ verifyDataIsSame(newDataTable, columnDataTypes, numColumns);
+ if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+ Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1);
+ newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
+ }
+ Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
+
+ // Verify V3 broker can deserialize data table (only has metadata) send by V3 server(with
+ // ThreadCpuTimeMeasurement enabled)
+ dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V3 data table
+ for (String key : EXPECTED_METADATA.keySet()) {
+ dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key));
+ }
+ newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3
+ Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE);
+ Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0);
+ if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
+ Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1);
+ newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName());
+ }
Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA);
}
@@ -255,15 +300,24 @@ public class DataTableSerDeTest {
fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns);
DataTable dataTable = dataTableBuilder.build();
+
+ // Disable ThreadCpuTimeMeasurement, serialize/de-serialize data table.
+ ThreadTimer.setThreadCpuTimeMeasurementEnabled(false);
DataTable newDataTable = DataTableFactory.getDataTable(dataTable.toBytes());
- // When ThreadCpuTimeMeasurement is disabled, value of threadCpuTimeNs is 0.
- Assert.assertEquals(newDataTable.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName()), String.valueOf(0));
+ // When ThreadCpuTimeMeasurement is disabled, no value for
+ // threadCpuTimeNs/systemActivitiesCpuTimeNs/responseSerializationCpuTimeNs.
+ Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName()));
+ Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName()));
+ Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
- // Enable ThreadCpuTimeMeasurement, serialize/de-serialize data table again.
+ // Enable ThreadCpuTimeMeasurement, serialize/de-serialize data table.
ThreadTimer.setThreadCpuTimeMeasurementEnabled(true);
newDataTable = DataTableFactory.getDataTable(dataTable.toBytes());
- // When ThreadCpuTimeMeasurement is enabled, value of threadCpuTimeNs is not 0.
- Assert.assertNotEquals(newDataTable.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName()), String.valueOf(0));
+ // When ThreadCpuTimeMeasurement is enabled, value of responseSerializationCpuTimeNs is not 0.
+ Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName()));
+ Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName()));
+ Assert.assertTrue(
+ Integer.parseInt(newDataTable.getMetadata().get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName())) > 0);
}
@Test
@@ -311,8 +365,8 @@ public class DataTableSerDeTest {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(metadataBytes);
DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) {
int numEntries = dataInputStream.readInt();
- // DataTable V3 serialization logic will add an extra THREAD_CPU_TIME_NS KV pair into metadata
- Assert.assertEquals(numEntries, EXPECTED_METADATA.size() + 1);
+ // DataTable V3 serialization logic will add an extra RESPONSE_SER_CPU_TIME_NS KV pair into metadata
+ Assert.assertEquals(numEntries, EXPECTED_METADATA.size());
for (int i = 0; i < numEntries; i++) {
int keyOrdinal = dataInputStream.readInt();
DataTable.MetadataKey key = MetadataKey.getByOrdinal(keyOrdinal);
@@ -324,8 +378,10 @@ public class DataTableSerDeTest {
} else if (key.getValueType() == DataTable.MetadataValueType.LONG) {
byte[] actualBytes = new byte[Long.BYTES];
dataInputStream.read(actualBytes);
- // Ignore the THREAD_CPU_TIME_NS key since it's added during data serialization.
- if (key != MetadataKey.THREAD_CPU_TIME_NS) {
+ // Ignore the THREAD_CPU_TIME_NS/SYSTEM_ACTIVITIES_CPU_TIME_NS/RESPONSE_SER_CPU_TIME_NS key since their value
+ // are evaluated during query execution.
+ if (key != MetadataKey.THREAD_CPU_TIME_NS && key != MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS
+ && key != MetadataKey.RESPONSE_SER_CPU_TIME_NS) {
Assert.assertEquals(actualBytes, Longs.toByteArray(Long.parseLong(EXPECTED_METADATA.get(key.getName()))));
}
} else {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/ThreadCpuTimeMeasurementTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/ThreadCpuTimeMeasurementTest.java
index a771965..543da75 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/ThreadCpuTimeMeasurementTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/ThreadCpuTimeMeasurementTest.java
@@ -30,32 +30,36 @@ public class ThreadCpuTimeMeasurementTest {
class TestCase {
final long _totalWallClockTimeNs;
final long _multipleThreadCpuTimeNs;
+ final long _singleThreadCpuTimeNs;
final int _numServerThreads;
- final long _totalThreadCpuTimeNs;
+ final long _systemActivitiesCpuTimeNs;
- TestCase(long totalWallClockTimeNs, long multipleThreadCpuTimeNs, int numServerThreads,
- long totalThreadCpuTimeNs) {
+ TestCase(long totalWallClockTimeNs, long multipleThreadCpuTimeNs, long singleThreadCpuTimeNs,
+ int numServerThreads, long systemActivitiesCpuTimeNs) {
_totalWallClockTimeNs = totalWallClockTimeNs;
_multipleThreadCpuTimeNs = multipleThreadCpuTimeNs;
+ _singleThreadCpuTimeNs = singleThreadCpuTimeNs;
_numServerThreads = numServerThreads;
- _totalThreadCpuTimeNs = totalThreadCpuTimeNs;
+ _systemActivitiesCpuTimeNs = systemActivitiesCpuTimeNs;
}
}
- TestCase[] testCases = new TestCase[]{
- new TestCase(4245673, 7124487, 3, 8995331), new TestCase(21500000, 10962161, 2, 26981081),
- new TestCase(59000000, 23690790, 1, 59000000), new TestCase(59124358, 11321792, 5, 68181792),
- new TestCase(79888780, 35537324, 7, 110349343), new TestCase(915432, 2462128, 4, 2762028)
- };
+ TestCase[] testCases =
+ new TestCase[]{new TestCase(4245673, 7124487, 1717171, 3, 153673), new TestCase(21500000, 10962161, 837, 2,
+ 16018083), new TestCase(59000000, 23690790, 4875647, 1, 30433563), new TestCase(59124358, 11321792, 164646,
+ 5, 56695354), new TestCase(79888780, 35537324, 16464, 7, 74795555), new TestCase(915432, 2462128, 63383, 4,
+ 236517)};
for (TestCase testCase : testCases) {
long totalWallClockTimeNs = testCase._totalWallClockTimeNs;
long multipleThreadCpuTimeNs = testCase._multipleThreadCpuTimeNs;
+ long singleThreadCpuTimeNs = testCase._singleThreadCpuTimeNs;
int numServerThreads = testCase._numServerThreads;
- long expectedTotalThreadCpuTimeNs = testCase._totalThreadCpuTimeNs;
- long actualTotalThreadCpuTimeNs = InstanceResponseOperator
- .calTotalThreadCpuTimeNs(totalWallClockTimeNs, multipleThreadCpuTimeNs, numServerThreads);
- Assert.equals(expectedTotalThreadCpuTimeNs, actualTotalThreadCpuTimeNs);
+ long expectedSystemActivitiesCpuTimeNs = testCase._systemActivitiesCpuTimeNs;
+ long actualSystemActivitiesCpuTimeNs = InstanceResponseOperator
+ .calSystemActivitiesCpuTimeNs(totalWallClockTimeNs, multipleThreadCpuTimeNs, singleThreadCpuTimeNs,
+ numServerThreads);
+ Assert.equals(expectedSystemActivitiesCpuTimeNs, actualSystemActivitiesCpuTimeNs);
}
}
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 609b26d..3643095 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1856,9 +1856,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
String responseType =
streamingResponse.getMetadataMap().get(CommonConstants.Query.Response.MetadataKeys.RESPONSE_TYPE);
if (responseType.equals(CommonConstants.Query.Response.ResponseType.DATA)) {
- // verify the returned data table metadata only contains "threadCpuTimeNs".
+ // verify the returned data table metadata only contains "responseSerializationCpuTimeNs".
Map<String, String> metadata = dataTable.getMetadata();
- assertTrue(metadata.size() == 1 && metadata.containsKey(MetadataKey.THREAD_CPU_TIME_NS.getName()));
+ assertTrue(metadata.size() == 1 && metadata.containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()));
assertNotNull(dataTable.getDataSchema());
numTotalDocs += dataTable.getNumberOfRows();
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org