You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/01/19 08:11:16 UTC
[kylin] 01/02: [KYLIN-5121] Make JobMetricsUtils.collectMetrics be working again
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit dc3fd9ab25ecd03ec53f8855f5366db8de542ce5
Author: hujiahua <hu...@youzan.com>
AuthorDate: Thu Nov 11 14:39:03 2021 +0800
[KYLIN-5121] Make JobMetricsUtils.collectMetrics be working again
---
.../kylin/engine/spark/job/OptimizeBuildJob.java | 28 ++++----
.../kylin/engine/spark/job/CubeBuildJob.java | 68 +++++++++----------
.../kylin/engine/spark/job/CubeMergeJob.java | 41 +++++------
.../kylin/engine/spark/utils/JobMetricsUtils.scala | 79 +++++-----------------
.../utils/QueryExecutionInterceptListener.scala | 33 +++++++++
5 files changed, 110 insertions(+), 139 deletions(-)
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
index 96ca190..8a412d5 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/OptimizeBuildJob.java
@@ -41,16 +41,15 @@ import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
+import org.apache.kylin.engine.spark.utils.BuildUtils;
import org.apache.kylin.engine.spark.utils.JobMetrics;
import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
import org.apache.kylin.engine.spark.utils.Metrics;
-import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
-import org.apache.kylin.engine.spark.utils.BuildUtils;
-import org.apache.kylin.metadata.model.IStorageAware;
-import org.apache.kylin.shaded.com.google.common.base.Preconditions;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.model.IStorageAware;
import org.apache.kylin.shaded.com.google.common.base.Joiner;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.storage.StorageFactory;
@@ -63,16 +62,13 @@ import scala.Tuple2;
import scala.collection.JavaConversions;
import java.io.IOException;
-
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.ArrayList;
-import java.util.UUID;
-
import java.util.stream.Collectors;
public class OptimizeBuildJob extends SparkApplication {
@@ -378,15 +374,15 @@ public class OptimizeBuildJob extends SparkApplication {
long parentId) throws IOException {
long layoutId = layout.getId();
- // for spark metrics
- String queryExecutionId = UUID.randomUUID().toString();
- ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId);
-
NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout,
NSparkCubingEngine.NSparkCubingStorage.class);
String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(),
String.valueOf(layoutId));
+
String tempPath = path + TEMP_DIR_SUFFIX;
+ // for spark metrics
+ String queryExecutionId = tempPath;
+ JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
// save to temp path
logger.info("Cuboids are saved to temp path : " + tempPath);
storage.saveTo(tempPath, dataset, ss);
@@ -402,14 +398,14 @@ public class OptimizeBuildJob extends SparkApplication {
cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt);
layout.setSourceRows(cuboidsRowCount.get(parentId));
} else {
+ cuboidsRowCount.putIfAbsent(layoutId, rowCount);
layout.setRows(rowCount);
layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));
}
int shardNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, cubeInstance.getConfig(), ss);
layout.setShardNum(shardNum);
cuboidShardNum.put(layoutId, (short) shardNum);
- ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null);
- QueryExecutionCache.removeQueryExecution(queryExecutionId);
+ JobMetricsUtils.unRegisterQueryExecutionListener(ss, queryExecutionId);
BuildUtils.fillCuboidInfo(layout, path);
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
index c30c206..a5f0e11 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java
@@ -18,41 +18,21 @@
package org.apache.kylin.engine.spark.job;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.cuboid.CuboidModeEnum;
-import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.engine.mr.common.CubeStatsWriter;
-import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
-import org.apache.kylin.measure.hllc.HLLCounter;
-import org.apache.kylin.shaded.com.google.common.base.Joiner;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.StatisticsDecisionUtil;
import org.apache.kylin.engine.spark.NSparkCubingEngine;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
@@ -66,9 +46,14 @@ import org.apache.kylin.engine.spark.utils.BuildUtils;
import org.apache.kylin.engine.spark.utils.JobMetrics;
import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
import org.apache.kylin.engine.spark.utils.Metrics;
-import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
+import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.shaded.com.google.common.base.Joiner;
+import org.apache.kylin.shaded.com.google.common.base.Preconditions;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.collect.Sets;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -77,14 +62,24 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.shaded.com.google.common.base.Preconditions;
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Sets;
-
import scala.Tuple2;
import scala.collection.JavaConversions;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
public class CubeBuildJob extends SparkApplication {
protected static final Logger logger = LoggerFactory.getLogger(CubeBuildJob.class);
protected static String TEMP_DIR_SUFFIX = "_temp";
@@ -457,15 +452,14 @@ public class CubeBuildJob extends SparkApplication {
long parentId) throws IOException {
long layoutId = layout.getId();
- // for spark metrics
- String queryExecutionId = UUID.randomUUID().toString();
- ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId);
-
NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout,
NSparkCubingEngine.NSparkCubingStorage.class);
String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(),
String.valueOf(layoutId));
String tempPath = path + TEMP_DIR_SUFFIX;
+ // for spark metrics
+ String queryExecutionId = tempPath;
+ JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
// save to temp path
logger.info("Cuboids are saved to temp path : " + tempPath);
storage.saveTo(tempPath, dataset, ss);
@@ -479,14 +473,14 @@ public class CubeBuildJob extends SparkApplication {
cuboidsRowCount.putIfAbsent(layoutId, cuboidRowCnt);
layout.setSourceRows(cuboidsRowCount.get(parentId));
} else {
+ cuboidsRowCount.putIfAbsent(layoutId, rowCount);
layout.setRows(rowCount);
layout.setSourceRows(metrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));
}
int shardNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, cubeInstance.getConfig(), ss);
layout.setShardNum(shardNum);
cuboidShardNum.put(layoutId, (short) shardNum);
- ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null);
- QueryExecutionCache.removeQueryExecution(queryExecutionId);
+ JobMetricsUtils.unRegisterQueryExecutionListener(ss, queryExecutionId);
BuildUtils.fillCuboidInfo(layout, path);
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
index ef3325f..16fecd3 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeMergeJob.java
@@ -18,16 +18,13 @@
package org.apache.kylin.engine.spark.job;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.engine.spark.NSparkCubingEngine;
+import org.apache.kylin.engine.spark.application.SparkApplication;
+import org.apache.kylin.engine.spark.builder.CubeMergeAssist;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
@@ -35,28 +32,27 @@ import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
+import org.apache.kylin.engine.spark.utils.BuildUtils;
+import org.apache.kylin.engine.spark.utils.JobMetrics;
+import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
+import org.apache.kylin.engine.spark.utils.Metrics;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.shaded.com.google.common.collect.Lists;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.shaded.com.google.common.collect.Lists;
-import org.apache.kylin.shaded.com.google.common.collect.Maps;
-
-import org.apache.kylin.engine.spark.NSparkCubingEngine;
-import org.apache.kylin.engine.spark.application.SparkApplication;
-import org.apache.kylin.engine.spark.builder.CubeMergeAssist;
-import org.apache.kylin.engine.spark.utils.BuildUtils;
-import org.apache.kylin.engine.spark.utils.JobMetrics;
-import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
-import org.apache.kylin.engine.spark.utils.Metrics;
-import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
import scala.collection.JavaConversions;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
public class CubeMergeJob extends SparkApplication {
protected static final Logger logger = LoggerFactory.getLogger(CubeMergeJob.class);
@@ -165,14 +161,14 @@ public class CubeMergeJob extends SparkApplication {
sourceCount += cuboid.getSourceRows();
}
- // for spark metrics
- String queryExecutionId = UUID.randomUUID().toString();
- ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), queryExecutionId);
ss.sparkContext().setJobDescription("merge layout " + layoutId);
NSparkCubingEngine.NSparkCubingStorage storage = StorageFactory.createEngineAdapter(layout,
NSparkCubingEngine.NSparkCubingStorage.class);
String path = PathManager.getParquetStoragePath(config, getParam(MetadataConstants.P_CUBE_NAME), seg.name(), seg.identifier(), String.valueOf(layoutId));
String tempPath = path + CubeBuildJob.TEMP_DIR_SUFFIX;
+ // for spark metrics
+ String queryExecutionId = tempPath;
+ JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
// save to temp path
storage.saveTo(tempPath, dataset, ss);
@@ -191,9 +187,8 @@ public class CubeMergeJob extends SparkApplication {
int partitionNum = BuildUtils.repartitionIfNeed(layout, storage, path, tempPath, config, ss);
layout.setShardNum(partitionNum);
cuboidShardNum.put(layoutId, (short)partitionNum);
- ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), null);
ss.sparkContext().setJobDescription(null);
- QueryExecutionCache.removeQueryExecution(queryExecutionId);
+ JobMetricsUtils.registerQueryExecutionListener(ss, queryExecutionId);
BuildUtils.fillCuboidInfo(layout, path);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
index dfb0d19..3130130 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala
@@ -21,7 +21,6 @@ package org.apache.kylin.engine.spark.utils
import java.util.concurrent.ConcurrentHashMap
import org.apache.spark.internal.Logging
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec}
@@ -33,20 +32,18 @@ object JobMetricsUtils extends Logging {
private val aggs = List(classOf[HashAggregateExec], classOf[SortAggregateExec], classOf[ObjectHashAggregateExec])
private val joins = List(classOf[BroadcastHashJoinExec], classOf[ShuffledHashJoinExec], classOf[SortMergeJoinExec],
classOf[BroadcastNestedLoopJoinExec], classOf[StreamingSymmetricHashJoinExec])
- var sparkListener : SparkListener = _
+
+ private val executionIdToListener = new ConcurrentHashMap[String, QueryExecutionInterceptListener]()
+
def collectMetrics(executionId: String): JobMetrics = {
var metrics = new JobMetrics
- val execution = QueryExecutionCache.getQueryExecution(executionId)
- if (execution != null) {
- metrics = collectOutputRows(execution.executedPlan)
+ val listener = executionIdToListener.getOrDefault(executionId,null)
+ if (listener != null && listener.queryExecution.isDefined) {
+ metrics = collectOutputRows(listener.queryExecution.get.executedPlan)
logInfo(s"Collect output rows successfully. $metrics")
+ } else {
+ logInfo(s"Collect output rows failed.")
}
-
- // comment below source, because it always collect failed when using apache spark.
-
- // else {
- // logDebug(s"Collect output rows failed.")
- //}
metrics
}
@@ -90,61 +87,17 @@ object JobMetricsUtils extends Logging {
rowMetrics
}
- /**
- * When using a custom spark which sent event which contain QueryExecution belongs to a specific N_EXECUTION_ID_KEY,
- * kylin can cache QueryExecution object into QueryExecutionCache and collect metrics such as bytes/row count for a cuboid
- *
- override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
- case e: PostQueryExecutionForKylin =>
- val nExecutionId = e.localProperties.getProperty(QueryExecutionCache.N_EXECUTION_ID_KEY, "")
- if (nExecutionId != "" && e.queryExecution != null) {
- QueryExecutionCache.setQueryExecution(nExecutionId, e.queryExecution)
- } else {
- logWarning("executionIdStr is null, can't get QueryExecution from SQLExecution.")
- }
- case _ => // Ignore
- }
- */
- def registerListener(ss: SparkSession): Unit = {
- sparkListener = new SparkListener {
-
- override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
- case _ => // Ignore
- }
- }
- ss.sparkContext.addSparkListener(sparkListener)
+ def registerQueryExecutionListener(ss: SparkSession, executionId: String): Unit = {
+ val listener = new QueryExecutionInterceptListener(executionId)
+ executionIdToListener.put(executionId, listener)
+ ss.listenerManager.register(listener)
}
- def unRegisterListener(ss: SparkSession) : Unit = {
- if (sparkListener != null) {
- ss.sparkContext.removeSparkListener(sparkListener)
+ def unRegisterQueryExecutionListener(ss: SparkSession, executionId: String) : Unit = {
+ val listener = executionIdToListener.remove(executionId)
+ if (listener != null) {
+ ss.listenerManager.unregister(listener)
}
}
}
-object QueryExecutionCache extends Logging {
- val N_EXECUTION_ID_KEY = "kylin.query.execution.id"
-
- private val executionIdToQueryExecution = new ConcurrentHashMap[String, QueryExecution]()
-
- def getQueryExecution(executionId: String): QueryExecution = {
- if (executionId != null) {
- executionIdToQueryExecution.get(executionId)
- } else {
- null
- }
- }
-
- def setQueryExecution(executionId: String, queryExecution: QueryExecution): Unit = {
- if (executionId != null) {
- executionIdToQueryExecution.put(executionId, queryExecution)
- } else {
- logWarning("kylin.query.execution.id is null, don't put QueryExecution into QueryExecutionCache.")
- }
- }
-
- def removeQueryExecution(executionId: String): Unit = {
- executionIdToQueryExecution.remove(executionId)
- }
-
-}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala
new file mode 100644
index 0000000..d9d64ef
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/QueryExecutionInterceptListener.scala
@@ -0,0 +1,33 @@
+package org.apache.kylin.engine.spark.utils
+
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import java.net.URI
+
+/**
+ * This QueryExecutionListener will intercept QueryExecution when outputPath matched, so make sure outputPath was unique.
+ */
+class QueryExecutionInterceptListener(outputPath: String) extends QueryExecutionListener{
+
+ var queryExecution : Option[QueryExecution] = None
+
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
+ qe.sparkPlan foreach {
+ case plan: DataWritingCommandExec =>{
+ //check if output path match
+ if (plan.cmd.isInstanceOf[InsertIntoHadoopFsRelationCommand]
+ && plan.cmd.asInstanceOf[InsertIntoHadoopFsRelationCommand].outputPath.toUri.equals(new URI(outputPath))) {
+ queryExecution = Some(qe)
+ }
+ }
+ case _ =>
+ }
+ }
+
+ override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {
+
+ }
+}