You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/01/19 08:11:15 UTC

[kylin] branch main updated (ad36cf7 -> 8365e66)

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from ad36cf7  [KYLIN-5125] fix up unit test
     new dc3fd9a  [KYLIN-5121] Make JobMetricsUtils.collectMetrics be working again
     new 8365e66  add license description

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kylin/engine/spark/job/OptimizeBuildJob.java   | 28 ++++----
 .../kylin/engine/spark/job/CubeBuildJob.java       | 68 +++++++++----------
 .../kylin/engine/spark/job/CubeMergeJob.java       | 41 +++++------
 .../kylin/engine/spark/utils/JobMetricsUtils.scala | 79 +++++-----------------
 .../utils/QueryExecutionInterceptListener.scala    | 51 ++++++++++++++
 5 files changed, 128 insertions(+), 139 deletions(-)
 create mode 100644 kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala

[kylin] 02/02: add license description

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 8365e66d81cd3f804a36571bed1892aa45360094
Author: hujiahua <hu...@youzan.com>
AuthorDate: Sat Nov 13 09:20:08 2021 +0800

    add license description
---
 .../spark/utils/QueryExecutionInterceptListener.scala  | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala
index d9d64ef..88a1d64 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
 package org.apache.kylin.engine.spark.utils
 
 import org.apache.spark.sql.execution.QueryExecution

[kylin] 01/02: [KYLIN-5121] Make JobMetricsUtils.collectMetrics be working again

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit dc3fd9ab25ecd03ec53f8855f5366db8de542ce5
Author: hujiahua <hu...@youzan.com>
AuthorDate: Thu Nov 11 14:39:03 2021 +0800

    [KYLIN-5121] Make JobMetricsUtils.collectMetrics be working again
---
 .../kylin/engine/spark/job/OptimizeBuildJob.java   | 28 ++++----
 .../kylin/engine/spark/job/CubeBuildJob.java       | 68 +++++++++----------
 .../kylin/engine/spark/job/CubeMergeJob.java       | 41 +++++------
 .../kylin/engine/spark/utils/JobMetricsUtils.scala | 79 +++++-----------------
 .../utils/QueryExecutionInterceptListener.scala    | 33 +++++++++
 5 files changed, 110 insertions(+), 139 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
index 96ca190..8a412d5 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
@@ -41,16 +41,15 @@ import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
 import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
+import org.apache.kylin.engine.spark.utils.BuildUtils;
 import org.apache.kylin.engine.spark.utils.JobMetrics;
 import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
 import org.apache.kylin.engine.spark.utils.Metrics;
-import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
-import org.apache.kylin.engine.spark.utils.BuildUtils;
-import org.apache.kylin.metadata.model.IStorageAware;
-import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.shaded.com.google.common.base.Joiner;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.storage.StorageFactory;
@@ -63,16 +62,13 @@ import scala.Tuple2;
 import scala.collection.JavaConversions;
 
 import java.io.IOException;
-
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.ArrayList;
-import java.util.UUID;
-
 import java.util.stream.Collectors;
 
 public class OptimizeBuildJob extends SparkApplication {
@@ -378,15 +374,15 @@ public class OptimizeBuildJob extends SparkApplication {
                                      long parentId) throws IOException {
         long layoutId = layout.getId();
 
-        // for spark metrics
-        String queryExecutionId = UUID.randomUUID().toString();
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId);
-
         NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout,
                 NSparkCubingEngine.NSparkCubingStorage.class);
         String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(),
                 String.valueOf(layoutId));
+
         String tempPath = path + TEMP_DIR_SUFFIX;
+        // for spark metrics
+        String queryExecutionId = tempPath;
+        JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
         // save to temp path
         logger.info("Cuboids are saved to temp path : " + tempPath);
         storage.saveTo(tempPath, dataset, ss);
@@ -402,14 +398,14 @@ public class OptimizeBuildJob extends SparkApplication {
             cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt);
             layout.setSourceRows(cuboidsRowCount.get(parentId));
         } else {
+            cuboidsRowCount.putIfAbsent(layoutId, rowCount);
             layout.setRows(rowCount);
             layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));
         }
         int shardNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, cubeInstance.getConfig(), ss);
         layout.setShardNum(shardNum);
         cuboidShardNum.put(layoutId, (short) shardNum);
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null);
-        QueryExecutionCache.removeQueryExecution(queryExecutionId);
+        JobMetricsUtils.unRegisterQueryExecutionListener(ss, queryExecutionId);
         BuildUtils.fillCuboidInfo(layout, path);
     }
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index c30c206..a5f0e11 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -18,41 +18,21 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.cuboid.CuboidModeEnum;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsWriter;
-import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
-import org.apache.kylin.measure.hllc.HLLCounter;
-import org.apache.kylin.shaded.com.google.common.base.Joiner;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
 import org.apache.kylin.engine.spark.NSparkCubingEngine;
 import org.apache.kylin.engine.spark.application.SparkApplication;
 import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
@@ -66,9 +46,14 @@ import org.apache.kylin.engine.spark.utils.BuildUtils;
 import org.apache.kylin.engine.spark.utils.JobMetrics;
 import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
 import org.apache.kylin.engine.spark.utils.Metrics;
-import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
+import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.shaded.com.google.common.base.Joiner;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import org.apache.kylin.storage.StorageFactory;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -77,14 +62,24 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.shaded.com.google.common.base.Preconditions;
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Sets;
-
 import scala.Tuple2;
 import scala.collection.JavaConversions;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
 public class CubeBuildJob extends SparkApplication {
     protected static final Logger logger = LoggerFactory.getLogger(CubeBuildJob.class);
     protected static String TEMP_DIR_SUFFIX = "_temp";
@@ -457,15 +452,14 @@ public class CubeBuildJob extends SparkApplication {
                                      long parentId) throws IOException {
         long layoutId = layout.getId();
 
-        // for spark metrics
-        String queryExecutionId = UUID.randomUUID().toString();
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId);
-
         NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout,
                 NSparkCubingEngine.NSparkCubingStorage.class);
         String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(),
                 String.valueOf(layoutId));
         String tempPath = path + TEMP_DIR_SUFFIX;
+        // for spark metrics
+        String queryExecutionId = tempPath;
+        JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
         // save to temp path
         logger.info("Cuboids are saved to temp path : " + tempPath);
         storage.saveTo(tempPath, dataset, ss);
@@ -479,14 +473,14 @@ public class CubeBuildJob extends SparkApplication {
             cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt);
             layout.setSourceRows(cuboidsRowCount.get(parentId));
         } else {
+            cuboidsRowCount.putIfAbsent(layoutId, rowCount);
             layout.setRows(rowCount);
             layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));
         }
         int shardNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, cubeInstance.getConfig(), ss);
         layout.setShardNum(shardNum);
         cuboidShardNum.put(layoutId, (short) shardNum);
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null);
-        QueryExecutionCache.removeQueryExecution(queryExecutionId);
+        JobMetricsUtils.unRegisterQueryExecutionListener(ss, queryExecutionId);
         BuildUtils.fillCuboidInfo(layout, path);
     }
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index ef3325f..16fecd3 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -18,16 +18,13 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.spark.NSparkCubingEngine;
+import org.apache.kylin.engine.spark.application.SparkApplication;
+import org.apache.kylin.engine.spark.builder.CubeMergeAssist;
 import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
 import org.apache.kylin.engine.spark.metadata.SegmentInfo;
 import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
@@ -35,28 +32,27 @@ import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
 import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
+import org.apache.kylin.engine.spark.utils.BuildUtils;
+import org.apache.kylin.engine.spark.utils.JobMetrics;
+import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
+import org.apache.kylin.engine.spark.utils.Metrics;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.storage.StorageFactory;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
-
-import org.apache.kylin.engine.spark.NSparkCubingEngine;
-import org.apache.kylin.engine.spark.application.SparkApplication;
-import org.apache.kylin.engine.spark.builder.CubeMergeAssist;
-import org.apache.kylin.engine.spark.utils.BuildUtils;
-import org.apache.kylin.engine.spark.utils.JobMetrics;
-import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
-import org.apache.kylin.engine.spark.utils.Metrics;
-import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
 import scala.collection.JavaConversions;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public class CubeMergeJob extends SparkApplication {
     protected static final Logger logger = LoggerFactory.getLogger(CubeMergeJob.class);
 
@@ -165,14 +161,14 @@ public class CubeMergeJob extends SparkApplication {
             sourceCount += cuboid.getSourceRows();
         }
 
-        // for spark metrics
-        String queryExecutionId = UUID.randomUUID().toString();
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId);
         ss.sparkContext().setJobDescription("merge layout " + layoutId);
         NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout,
                 NSparkCubingEngine.NSparkCubingStorage.class);
         String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(), String.valueOf(layoutId));
         String tempPath = path + CubeBuildJob.TEMP_DIR_SUFFIX;
+        // for spark metrics
+        String queryExecutionId = tempPath;
+        JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
         // save to temp path
         storage.saveTo(tempPath, dataset, ss);
 
@@ -191,9 +187,8 @@ public class CubeMergeJob extends SparkApplication {
         int partitionNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, config, ss);
         layout.setShardNum(partitionNum);
         cuboidShardNum.put(layoutId, (short)partitionNum);
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null);
         ss.sparkContext().setJobDescription(null);
-        QueryExecutionCache.removeQueryExecution(queryExecutionId);
+        JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
 
         BuildUtils.fillCuboidInfo(layout, path);
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
index dfb0d19..3130130 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.spark.utils
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
@@ -33,20 +32,18 @@ object JobMetricsUtils extends Logging {
   private val aggs = List(classOf[HashAggregateExec], classOf[SortAggregateExec], classOf[ObjectHashAggregateExec])
   private val joins = List(classOf[BroadcastHashJoinExec], classOf[ShuffledHashJoinExec], classOf[SortMergeJoinExec],
     classOf[BroadcastNestedLoopJoinExec], classOf[StreamingSymmetricHashJoinExec])
-  var sparkListener : SparkListener = _
+
+  private val executionIdToListener = new ConcurrentHashMap[String, QueryExecutionInterceptListener]()
+
   def collectMetrics(executionId: String): JobMetrics = {
     var metrics = new JobMetrics
-    val execution = QueryExecutionCache.getQueryExecution(executionId)
-    if (execution != null) {
-      metrics = collectOutputRows(execution.executedPlan)
+    val listener = executionIdToListener.getOrDefault(executionId,null)
+    if (listener != null && listener.queryExecution.isDefined) {
+      metrics = collectOutputRows(listener.queryExecution.get.executedPlan)
       logInfo(s"Collect output rows successfully. $metrics")
+    } else {
+      logInfo(s"Collect output rows failed.")
     }
-
-    // comment below source, because it always collect failed when using apache spark.
-
-    // else {
-    // logDebug(s"Collect output rows failed.")
-    //}
     metrics
   }
 
@@ -90,61 +87,17 @@ object JobMetricsUtils extends Logging {
     rowMetrics
   }
 
-  /**
-   * When using a custom spark which sent event which contain QueryExecution belongs to a specific N_EXECUTION_ID_KEY,
-   * kylin can cache QueryExecution object into QueryExecutionCache and collect metrics such as bytes/row count for a cuboid
-   *
-     override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
-        case e: PostQueryExecutionForKylin =>
-          val nExecutionId = e.localProperties.getProperty(QueryExecutionCache.N_EXECUTION_ID_KEY, "")
-          if (nExecutionId != "" && e.queryExecution != null) {
-            QueryExecutionCache.setQueryExecution(nExecutionId, e.queryExecution)
-          } else {
-            logWarning("executionIdStr is null, can't get QueryExecution from SQLExecution.")
-          }
-        case _ => // Ignore
-      }
-   */
-  def registerListener(ss: SparkSession): Unit = {
-    sparkListener = new SparkListener {
-
-      override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
-        case _ => // Ignore
-      }
-    }
-    ss.sparkContext.addSparkListener(sparkListener)
+  def registerQueryExecutionListener(ss: SparkSession, executionId: String): Unit = {
+    val listener = new QueryExecutionInterceptListener(executionId)
+    executionIdToListener.put(executionId, listener)
+    ss.listenerManager.register(listener)
   }
 
-  def unRegisterListener(ss: SparkSession) : Unit = {
-    if (sparkListener != null) {
-      ss.sparkContext.removeSparkListener(sparkListener)
+  def unRegisterQueryExecutionListener(ss: SparkSession, executionId: String) : Unit = {
+    val listener =  executionIdToListener.remove(executionId)
+    if (listener != null) {
+      ss.listenerManager.unregister(listener)
     }
   }
 }
 
-object QueryExecutionCache extends Logging {
-  val N_EXECUTION_ID_KEY = "kylin.query.execution.id"
-
-  private val executionIdToQueryExecution = new ConcurrentHashMap[String, QueryExecution]()
-
-  def getQueryExecution(executionId: String): QueryExecution = {
-    if (executionId != null) {
-      executionIdToQueryExecution.get(executionId)
-    } else {
-      null
-    }
-  }
-
-  def setQueryExecution(executionId: String, queryExecution: QueryExecution): Unit = {
-    if (executionId != null) {
-      executionIdToQueryExecution.put(executionId, queryExecution)
-    } else {
-      logWarning("kylin.query.execution.id is null, don't put QueryExecution into QueryExecutionCache.")
-    }
-  }
-
-  def removeQueryExecution(executionId: String): Unit = {
-    executionIdToQueryExecution.remove(executionId)
-  }
-
-}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala
new file mode 100644
index 0000000..d9d64ef
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala
@@ -0,0 +1,33 @@
+package org.apache.kylin.engine.spark.utils
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import java.net.URI
+
+/**
+ * This QueryExecutionListener will intercept QueryExecution when outputPath matched, so make sure outputPath was unique.
+ */
+class QueryExecutionInterceptListener(outputPath: String) extends QueryExecutionListener{
+
+  var queryExecution : Option[QueryExecution] = None
+
+  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+    qe.sparkPlan foreach {
+      case plan: DataWritingCommandExec =>{
+        //check if output path match
+        if (plan.cmd.isInstanceOf[InsertIntoHadoopFsRelationCommand]
+          && plan.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand].outputPath.toUri.equals(new URI(outputPath))) {
+          queryExecution = Some(qe)
+        }
+      }
+      case _ =>
+    }
+  }
+
+  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
+
+  }
+}