You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/03/10 05:52:55 UTC
[kylin] branch main updated: KYLIN-5121 fix key not found: numOutputRows on s3
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/main by this push:
new d56d72f KYLIN-5121 fix key not found: numOutputRows on s3
d56d72f is described below
commit d56d72f736db5ea52bcc9a672dd20dc6ee4aaaae
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Thu Mar 10 10:03:08 2022 +0800
KYLIN-5121 fix key not found: numOutputRows on s3
---
.../apache/kylin/engine/spark/utils/JobMetricsUtils.scala | 14 ++++++++++----
1 file changed, 10 insertions(+), 4 deletions(-)
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
index 3130130..437d8b0 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
@@ -56,11 +56,15 @@ object JobMetricsUtils extends Logging {
case plan: UnaryExecNode =>
if (aggs.contains(plan.getClass) && !afterAgg) {
afterAgg = true
- rowMetrics.setMetrics(Metrics.CUBOID_ROWS_CNT, plan.metrics.apply("numOutputRows").value)
+ if (plan.metrics.contains("numOutputRows")) {
+ rowMetrics.setMetrics(Metrics.CUBOID_ROWS_CNT, plan.metrics.apply("numOutputRows").value)
+ }
}
case plan: BinaryExecNode =>
if (joins.contains(plan.getClass) && !afterJoin) {
- rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, plan.metrics.apply("numOutputRows").value)
+ if (plan.metrics.contains("numOutputRows")) {
+ rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, plan.metrics.apply("numOutputRows").value)
+ }
afterJoin = true
}
case plan: LeafExecNode =>
@@ -72,8 +76,10 @@ object JobMetricsUtils extends Logging {
rowMetrics.getMetrics(Metrics.SOURCE_ROWS_CNT)
}
- val rowsCnt = preCnt + plan.metrics.apply("numOutputRows").value
- rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, rowsCnt)
+ if (plan.metrics.contains("numOutputRows")) {
+ val rowsCnt = preCnt + plan.metrics.apply("numOutputRows").value
+ rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, rowsCnt)
+ }
}
case _ =>
}