You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/03/10 05:52:55 UTC

[kylin] branch main updated: KYLIN-5121 fix key not found: numOutputRows on s3

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/main by this push:
     new d56d72f  KYLIN-5121 fix key not found: numOutputRows on s3
d56d72f is described below

commit d56d72f736db5ea52bcc9a672dd20dc6ee4aaaae
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Thu Mar 10 10:03:08 2022 +0800

    KYLIN-5121 fix key not found: numOutputRows on s3
---
 .../apache/kylin/engine/spark/utils/JobMetricsUtils.scala  | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
index 3130130..437d8b0 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
@@ -56,11 +56,15 @@ object JobMetricsUtils extends Logging {
       case plan: UnaryExecNode =>
         if (aggs.contains(plan.getClass) && !afterAgg) {
           afterAgg = true
-          rowMetrics.setMetrics(Metrics.CUBOID_ROWS_CNT, plan.metrics.apply("numOutputRows").value)
+          if (plan.metrics.contains("numOutputRows")) {
+            rowMetrics.setMetrics(Metrics.CUBOID_ROWS_CNT, plan.metrics.apply("numOutputRows").value)
+          }
         }
       case plan: BinaryExecNode =>
         if (joins.contains(plan.getClass) && !afterJoin) {
-          rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, plan.metrics.apply("numOutputRows").value)
+          if (plan.metrics.contains("numOutputRows")) {
+            rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, plan.metrics.apply("numOutputRows").value)
+          }
           afterJoin = true
         }
       case plan: LeafExecNode =>
@@ -72,8 +76,10 @@ object JobMetricsUtils extends Logging {
             rowMetrics.getMetrics(Metrics.SOURCE_ROWS_CNT)
           }
 
-          val rowsCnt = preCnt + plan.metrics.apply("numOutputRows").value
-          rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, rowsCnt)
+          if (plan.metrics.contains("numOutputRows")) {
+            val rowsCnt = preCnt + plan.metrics.apply("numOutputRows").value
+            rowMetrics.setMetrics(Metrics.SOURCE_ROWS_CNT, rowsCnt)
+          }
         }
       case _ =>
     }