You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/01/19 08:11:16 UTC

[kylin] 01/02: [KYLIN-5121] Make JobMetricsUtils.collectMetrics be working again

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit dc3fd9ab25ecd03ec53f8855f5366db8de542ce5
Author: hujiahua <hu...@youzan.com>
AuthorDate: Thu Nov 11 14:39:03 2021 +0800

    [KYLIN-5121] Make JobMetricsUtils.collectMetrics be working again
---
 .../kylin/engine/spark/job/OptimizeBuildJob.java   | 28 ++++----
 .../kylin/engine/spark/job/CubeBuildJob.java       | 68 +++++++++----------
 .../kylin/engine/spark/job/CubeMergeJob.java       | 41 +++++------
 .../kylin/engine/spark/utils/JobMetricsUtils.scala | 79 +++++-----------------
 .../utils/QueryExecutionInterceptListener.scala    | 33 +++++++++
 5 files changed, 110 insertions(+), 139 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
index 96ca190..8a412d5 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
@@ -41,16 +41,15 @@ import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
 import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
+import org.apache.kylin.engine.spark.utils.BuildUtils;
 import org.apache.kylin.engine.spark.utils.JobMetrics;
 import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
 import org.apache.kylin.engine.spark.utils.Metrics;
-import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
-import org.apache.kylin.engine.spark.utils.BuildUtils;
-import org.apache.kylin.metadata.model.IStorageAware;
-import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.shaded.com.google.common.base.Joiner;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.storage.StorageFactory;
@@ -63,16 +62,13 @@ import scala.Tuple2;
 import scala.collection.JavaConversions;
 
 import java.io.IOException;
-
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.ArrayList;
-import java.util.UUID;
-
 import java.util.stream.Collectors;
 
 public class OptimizeBuildJob extends SparkApplication {
@@ -378,15 +374,15 @@ public class OptimizeBuildJob extends SparkApplication {
                                      long parentId) throws IOException {
         long layoutId = layout.getId();
 
-        // for spark metrics
-        String queryExecutionId = UUID.randomUUID().toString();
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId);
-
         NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout,
                 NSparkCubingEngine.NSparkCubingStorage.class);
         String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(),
                 String.valueOf(layoutId));
+
         String tempPath = path + TEMP_DIR_SUFFIX;
+        // for spark metrics
+        String queryExecutionId = tempPath;
+        JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
         // save to temp path
         logger.info("Cuboids are saved to temp path : " + tempPath);
         storage.saveTo(tempPath, dataset, ss);
@@ -402,14 +398,14 @@ public class OptimizeBuildJob extends SparkApplication {
             cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt);
             layout.setSourceRows(cuboidsRowCount.get(parentId));
         } else {
+            cuboidsRowCount.putIfAbsent(layoutId, rowCount);
             layout.setRows(rowCount);
             layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));
         }
         int shardNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, cubeInstance.getConfig(), ss);
         layout.setShardNum(shardNum);
         cuboidShardNum.put(layoutId, (short) shardNum);
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null);
-        QueryExecutionCache.removeQueryExecution(queryExecutionId);
+        JobMetricsUtils.unRegisterQueryExecutionListener(ss, queryExecutionId);
         BuildUtils.fillCuboidInfo(layout, path);
     }
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index c30c206..a5f0e11 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -18,41 +18,21 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.cuboid.CuboidModeEnum;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsWriter;
-import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
-import org.apache.kylin.measure.hllc.HLLCounter;
-import org.apache.kylin.shaded.com.google.common.base.Joiner;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
 import org.apache.kylin.engine.spark.NSparkCubingEngine;
 import org.apache.kylin.engine.spark.application.SparkApplication;
 import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
@@ -66,9 +46,14 @@ import org.apache.kylin.engine.spark.utils.BuildUtils;
 import org.apache.kylin.engine.spark.utils.JobMetrics;
 import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
 import org.apache.kylin.engine.spark.utils.Metrics;
-import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
+import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.shaded.com.google.common.base.Joiner;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
 import org.apache.kylin.storage.StorageFactory;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -77,14 +62,24 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.shaded.com.google.common.base.Preconditions;
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Sets;
-
 import scala.Tuple2;
 import scala.collection.JavaConversions;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
 public class CubeBuildJob extends SparkApplication {
     protected static final Logger logger = LoggerFactory.getLogger(CubeBuildJob.class);
     protected static String TEMP_DIR_SUFFIX = "_temp";
@@ -457,15 +452,14 @@ public class CubeBuildJob extends SparkApplication {
                                      long parentId) throws IOException {
         long layoutId = layout.getId();
 
-        // for spark metrics
-        String queryExecutionId = UUID.randomUUID().toString();
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId);
-
         NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout,
                 NSparkCubingEngine.NSparkCubingStorage.class);
         String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(),
                 String.valueOf(layoutId));
         String tempPath = path + TEMP_DIR_SUFFIX;
+        // for spark metrics
+        String queryExecutionId = tempPath;
+        JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
         // save to temp path
         logger.info("Cuboids are saved to temp path : " + tempPath);
         storage.saveTo(tempPath, dataset, ss);
@@ -479,14 +473,14 @@ public class CubeBuildJob extends SparkApplication {
             cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt);
             layout.setSourceRows(cuboidsRowCount.get(parentId));
         } else {
+            cuboidsRowCount.putIfAbsent(layoutId, rowCount);
             layout.setRows(rowCount);
             layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));
         }
         int shardNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, cubeInstance.getConfig(), ss);
         layout.setShardNum(shardNum);
         cuboidShardNum.put(layoutId, (short) shardNum);
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null);
-        QueryExecutionCache.removeQueryExecution(queryExecutionId);
+        JobMetricsUtils.unRegisterQueryExecutionListener(ss, queryExecutionId);
         BuildUtils.fillCuboidInfo(layout, path);
     }
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index ef3325f..16fecd3 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -18,16 +18,13 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.spark.NSparkCubingEngine;
+import org.apache.kylin.engine.spark.application.SparkApplication;
+import org.apache.kylin.engine.spark.builder.CubeMergeAssist;
 import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
 import org.apache.kylin.engine.spark.metadata.SegmentInfo;
 import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
@@ -35,28 +32,27 @@ import org.apache.kylin.engine.spark.metadata.cube.PathManager;
 import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
 import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
 import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
+import org.apache.kylin.engine.spark.utils.BuildUtils;
+import org.apache.kylin.engine.spark.utils.JobMetrics;
+import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
+import org.apache.kylin.engine.spark.utils.Metrics;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.storage.StorageFactory;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
-
-import org.apache.kylin.engine.spark.NSparkCubingEngine;
-import org.apache.kylin.engine.spark.application.SparkApplication;
-import org.apache.kylin.engine.spark.builder.CubeMergeAssist;
-import org.apache.kylin.engine.spark.utils.BuildUtils;
-import org.apache.kylin.engine.spark.utils.JobMetrics;
-import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
-import org.apache.kylin.engine.spark.utils.Metrics;
-import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
 import scala.collection.JavaConversions;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public class CubeMergeJob extends SparkApplication {
     protected static final Logger logger = LoggerFactory.getLogger(CubeMergeJob.class);
 
@@ -165,14 +161,14 @@ public class CubeMergeJob extends SparkApplication {
             sourceCount += cuboid.getSourceRows();
         }
 
-        // for spark metrics
-        String queryExecutionId = UUID.randomUUID().toString();
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId);
         ss.sparkContext().setJobDescription("merge layout " + layoutId);
         NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout,
                 NSparkCubingEngine.NSparkCubingStorage.class);
         String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(), String.valueOf(layoutId));
         String tempPath = path + CubeBuildJob.TEMP_DIR_SUFFIX;
+        // for spark metrics
+        String queryExecutionId = tempPath;
+        JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
         // save to temp path
         storage.saveTo(tempPath, dataset, ss);
 
@@ -191,9 +187,8 @@ public class CubeMergeJob extends SparkApplication {
         int partitionNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, config, ss);
         layout.setShardNum(partitionNum);
         cuboidShardNum.put(layoutId, (short)partitionNum);
-        ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null);
         ss.sparkContext().setJobDescription(null);
-        QueryExecutionCache.removeQueryExecution(queryExecutionId);
+        JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
 
         BuildUtils.fillCuboidInfo(layout, path);
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
index dfb0d19..3130130 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.spark.utils
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
@@ -33,20 +32,18 @@ object JobMetricsUtils extends Logging {
   private val aggs = List(classOf[HashAggregateExec], classOf[SortAggregateExec], classOf[ObjectHashAggregateExec])
   private val joins = List(classOf[BroadcastHashJoinExec], classOf[ShuffledHashJoinExec], classOf[SortMergeJoinExec],
     classOf[BroadcastNestedLoopJoinExec], classOf[StreamingSymmetricHashJoinExec])
-  var sparkListener : SparkListener = _
+
+  private val executionIdToListener = new ConcurrentHashMap[String, QueryExecutionInterceptListener]()
+
   def collectMetrics(executionId: String): JobMetrics = {
     var metrics = new JobMetrics
-    val execution = QueryExecutionCache.getQueryExecution(executionId)
-    if (execution != null) {
-      metrics = collectOutputRows(execution.executedPlan)
+    val listener = executionIdToListener.getOrDefault(executionId,null)
+    if (listener != null && listener.queryExecution.isDefined) {
+      metrics = collectOutputRows(listener.queryExecution.get.executedPlan)
       logInfo(s"Collect output rows successfully. $metrics")
+    } else {
+      logInfo(s"Collect output rows failed.")
     }
-
-    // comment below source, because it always collect failed when using apache spark.
-
-    // else {
-    // logDebug(s"Collect output rows failed.")
-    //}
     metrics
   }
 
@@ -90,61 +87,17 @@ object JobMetricsUtils extends Logging {
     rowMetrics
   }
 
-  /**
-   * When using a custom spark which sent event which contain QueryExecution belongs to a specific N_EXECUTION_ID_KEY,
-   * kylin can cache QueryExecution object into QueryExecutionCache and collect metrics such as bytes/row count for a cuboid
-   *
-     override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
-        case e: PostQueryExecutionForKylin =>
-          val nExecutionId = e.localProperties.getProperty(QueryExecutionCache.N_EXECUTION_ID_KEY, "")
-          if (nExecutionId != "" && e.queryExecution != null) {
-            QueryExecutionCache.setQueryExecution(nExecutionId, e.queryExecution)
-          } else {
-            logWarning("executionIdStr is null, can't get QueryExecution from SQLExecution.")
-          }
-        case _ => // Ignore
-      }
-   */
-  def registerListener(ss: SparkSession): Unit = {
-    sparkListener = new SparkListener {
-
-      override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
-        case _ => // Ignore
-      }
-    }
-    ss.sparkContext.addSparkListener(sparkListener)
+  def registerQueryExecutionListener(ss: SparkSession, executionId: String): Unit = {
+    val listener = new QueryExecutionInterceptListener(executionId)
+    executionIdToListener.put(executionId, listener)
+    ss.listenerManager.register(listener)
   }
 
-  def unRegisterListener(ss: SparkSession) : Unit = {
-    if (sparkListener != null) {
-      ss.sparkContext.removeSparkListener(sparkListener)
+  def unRegisterQueryExecutionListener(ss: SparkSession, executionId: String) : Unit = {
+    val listener =  executionIdToListener.remove(executionId)
+    if (listener != null) {
+      ss.listenerManager.unregister(listener)
     }
   }
 }
 
-object QueryExecutionCache extends Logging {
-  val N_EXECUTION_ID_KEY = "kylin.query.execution.id"
-
-  private val executionIdToQueryExecution = new ConcurrentHashMap[String, QueryExecution]()
-
-  def getQueryExecution(executionId: String): QueryExecution = {
-    if (executionId != null) {
-      executionIdToQueryExecution.get(executionId)
-    } else {
-      null
-    }
-  }
-
-  def setQueryExecution(executionId: String, queryExecution: QueryExecution): Unit = {
-    if (executionId != null) {
-      executionIdToQueryExecution.put(executionId, queryExecution)
-    } else {
-      logWarning("kylin.query.execution.id is null, don't put QueryExecution into QueryExecutionCache.")
-    }
-  }
-
-  def removeQueryExecution(executionId: String): Unit = {
-    executionIdToQueryExecution.remove(executionId)
-  }
-
-}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala
new file mode 100644
index 0000000..d9d64ef
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala
@@ -0,0 +1,33 @@
+package org.apache.kylin.engine.spark.utils
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import java.net.URI
+
+/**
+ * This QueryExecutionListener will intercept QueryExecution when outputPath matched, so make sure outputPath was unique.
+ */
+class QueryExecutionInterceptListener(outputPath: String) extends QueryExecutionListener{
+
+  var queryExecution : Option[QueryExecution] = None
+
+  override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+    qe.sparkPlan foreach {
+      case plan: DataWritingCommandExec =>{
+        //check if output path match
+        if (plan.cmd.isInstanceOf[InsertIntoHadoopFsRelationCommand]
+          && plan.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand].outputPath.toUri.equals(new URI(outputPath))) {
+          queryExecution = Some(qe)
+        }
+      }
+      case _ =>
+    }
+  }
+
+  override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
+
+  }
+}