You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/05/06 06:59:14 UTC
[kylin] 13/38: KYLIN-5528 support collect sparder gc
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 842a0601a643ac83d894ec8a248673c8fd13cfb5
Author: Yaguang Jia <ji...@foxmail.com>
AuthorDate: Thu Feb 23 18:00:57 2023 +0800
KYLIN-5528 support collect sparder gc
* KYLIN-5528 support collect sparder gc
* KYLIN-5528 add gc option to executor extraJavaOption
* KYLIN-5528 change timeout to 30s
---
.../src/main/resources/config/init.properties | 2 +-
.../apache/kylin/rest/service/SystemService.java | 5 +
.../org/apache/kylin/common/KylinConfigBase.java | 14 +++
.../src/main/resources/kylin-defaults0.properties | 2 +-
.../src/main/resources/config/init.properties | 2 +-
.../src/main/resources/config/init.properties | 2 +-
.../kylin/rest/controller/NQueryController.java | 2 +-
.../src/main/resources/config/init.properties | 2 +-
.../engine/spark/job/NSparkExecutableTest.java | 15 ++-
.../asyncprofiler/AsyncProfiling.scala | 2 +-
.../QueryAsyncProfilerDriverPlugin.scala | 2 +-
.../QueryAsyncProfilerSparkPlugin.scala | 2 +-
.../diagnose/DiagnoseConstant.scala} | 21 +++-
.../diagnose/DiagnoseDriverPlugin.scala} | 50 +++++----
.../plugin/diagnose/DiagnoseExecutorPlugin.scala | 124 +++++++++++++++++++++
.../query/plugin/diagnose/DiagnoseHelper.scala | 62 +++++++++++
.../diagnose/DiagnoseSparkPlugin.scala} | 9 +-
.../scala/org/apache/spark/sql/KylinSession.scala | 56 ++++++----
.../scala/org/apache/spark/sql/SparderEnv.scala | 5 +-
.../SparkPluginWithMeta.scala} | 4 +-
.../asyncprofiler/AsyncProfilingTest.scala | 5 +-
.../QueryAsyncProfilerSparkPluginTest.scala | 2 +-
.../QueryProfilerDriverPluginTest.scala} | 5 +-
.../diagnose/DiagnoseExecutorPluginTest.scala | 109 ++++++++++++++++++
.../query/plugin/diagnose/DiagnosePluginTest.scala | 82 ++++++++++++++
25 files changed, 506 insertions(+), 80 deletions(-)
diff --git a/src/common-booter/src/main/resources/config/init.properties b/src/common-booter/src/main/resources/config/init.properties
index 5dbf475978..3ff1076256 100644
--- a/src/common-booter/src/main/resources/config/init.properties
+++ b/src/common-booter/src/main/resources/config/init.properties
@@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
# for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...]
kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml
kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
#kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java
index fa467e414d..25c961be8c 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java
@@ -60,6 +60,7 @@ import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.query.plugin.diagnose.DiagnoseHelper;
import org.apache.kylin.rest.constant.Constant;
import org.apache.kylin.rest.request.BackupRequest;
import org.apache.kylin.rest.request.DiagProgressRequest;
@@ -138,6 +139,8 @@ public class SystemService extends BasicService {
String[] arguments;
// full
if (StringUtils.isEmpty(jobId) && StringUtils.isEmpty(queryId)) {
+ // Sparder executor gc log should be collected before FULL and QUERY diag package
+ DiagnoseHelper.collectSparderExecutorGc();
if (startTime == null && endTime == null) {
startTime = Long.toString(System.currentTimeMillis() - 259200000L);
endTime = Long.toString(System.currentTimeMillis());
@@ -153,6 +156,8 @@ public class SystemService extends BasicService {
arguments = new String[] { jobOpt, jobId, "-destDir", exportFile.getAbsolutePath(), "-diagId", uuid };
diagPackageType = JOB;
} else { //query
+ // Sparder executor gc log should be collected before FULL and QUERY diag package
+ DiagnoseHelper.collectSparderExecutorGc();
arguments = new String[] { "-project", project, "-query", queryId, "-destDir", exportFile.getAbsolutePath(),
"-diagId", uuid };
diagPackageType = QUERY;
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 32af04760a..0490ce7b84 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2565,6 +2565,10 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.job.notification-on-empty-data-load", FALSE));
}
+ public boolean getJobErrorNotificationEnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.job.notification-on-job-error", FALSE));
+ }
+
public Long getStorageResourceSurvivalTimeThreshold() {
return TimeUtil.timeStringAs(this.getOptional("kylin.storage.resource-survival-time-threshold", "7d"),
TimeUnit.MILLISECONDS);
@@ -3812,6 +3816,16 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.build.allow-non-strict-count-check", FALSE));
}
+ public long queryDiagnoseCollectionTimeout() {
+ return TimeUtil.timeStringAs(getOptional("kylin.query.diagnose-collection-timeout", "30s"),
+ TimeUnit.MILLISECONDS);
+ }
+
+ public boolean queryDiagnoseEnable() {
+ return !Boolean.parseBoolean(System.getProperty("spark.local", FALSE))
+ && Boolean.parseBoolean(getOptional("kylin.query.diagnose-enabled", TRUE));
+ }
+
// ============================================================================
// Cost based index Planner
// ============================================================================
diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties b/src/core-common/src/main/resources/kylin-defaults0.properties
index cbacd5d78a..9aee2acdda 100644
--- a/src/core-common/src/main/resources/kylin-defaults0.properties
+++ b/src/core-common/src/main/resources/kylin-defaults0.properties
@@ -115,7 +115,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
# for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...]
kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml
kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
#kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
diff --git a/src/data-loading-booter/src/main/resources/config/init.properties b/src/data-loading-booter/src/main/resources/config/init.properties
index 5dbf475978..3ff1076256 100644
--- a/src/data-loading-booter/src/main/resources/config/init.properties
+++ b/src/data-loading-booter/src/main/resources/config/init.properties
@@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
# for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...]
kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml
kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
#kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
diff --git a/src/query-booter/src/main/resources/config/init.properties b/src/query-booter/src/main/resources/config/init.properties
index 5dbf475978..3ff1076256 100644
--- a/src/query-booter/src/main/resources/config/init.properties
+++ b/src/query-booter/src/main/resources/config/init.properties
@@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
# for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...]
kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml
kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
#kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java
index ea088d38a8..21936b8faf 100644
--- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java
+++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java
@@ -71,7 +71,7 @@ import org.apache.kylin.common.persistence.transaction.StopQueryBroadcastEventNo
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.metadata.query.QueryHistoryRequest;
import org.apache.kylin.metadata.query.util.QueryHisTransformStandardUtil;
-import org.apache.kylin.query.asyncprofiler.AsyncProfiling;
+import org.apache.kylin.query.plugin.asyncprofiler.AsyncProfiling;
import org.apache.kylin.rest.cluster.ClusterManager;
import org.apache.kylin.rest.request.SQLFormatRequest;
import org.apache.kylin.rest.response.QueryStatisticsResponse;
diff --git a/src/server/src/main/resources/config/init.properties b/src/server/src/main/resources/config/init.properties
index 5dbf475978..3ff1076256 100644
--- a/src/server/src/main/resources/config/init.properties
+++ b/src/server/src/main/resources/config/init.properties
@@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
# for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...]
kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml
kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
#kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
index ce5a26c72d..2fcf39b87d 100644
--- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
+++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
@@ -29,6 +29,8 @@ import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.metadata.cube.model.NBatchConstants;
import org.apache.kylin.metadata.model.NDataModel;
import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin;
+import org.apache.kylin.query.plugin.asyncprofiler.QueryAsyncProfilerSparkPlugin;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -152,12 +154,11 @@ public class NSparkExecutableTest extends NLocalFileMetadataTestCase {
String cmd = (String) sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc);
Assert.assertNotNull(cmd);
- Assert.assertTrue(
- cmd.contains("spark.plugins=org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin"));
+ Assert.assertTrue(cmd.contains("spark.plugins=" + BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
}
overwriteSystemProp("kylin.engine.spark-conf.spark.plugins",
- "org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin");
+ QueryAsyncProfilerSparkPlugin.class.getCanonicalName());
{
val desc = sparkExecutable.getSparkAppDesc();
desc.setHadoopConfDir(hadoopConf);
@@ -166,9 +167,8 @@ public class NSparkExecutableTest extends NLocalFileMetadataTestCase {
String cmd = (String) sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc);
Assert.assertNotNull(cmd);
- Assert.assertTrue(
- cmd.contains("spark.plugins=org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin,"
- + "org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin"));
+ Assert.assertTrue(cmd.contains("spark.plugins=" + QueryAsyncProfilerSparkPlugin.class.getCanonicalName()
+ + "," + BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
}
overwriteSystemProp("kylin.engine.async-profiler-enabled", "false");
@@ -180,8 +180,7 @@ public class NSparkExecutableTest extends NLocalFileMetadataTestCase {
String cmd = (String) sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc);
Assert.assertNotNull(cmd);
- Assert.assertFalse(
- cmd.contains("spark.plugins=org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin"));
+ Assert.assertFalse(cmd.contains("spark.plugins=" + BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
}
overwriteSystemProp("kylin.engine.spark-conf.spark.driver.extraJavaOptions",
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfiling.scala
similarity index 98%
rename from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala
rename to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfiling.scala
index d27cfc76f5..8147ba94ec 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfiling.scala
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
import org.apache.kylin.common.KylinConfig
import org.apache.kylin.common.asyncprofiler.Message._
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
similarity index 97%
copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
copy to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
index 8af9631561..f8fe0e38a6 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
import org.apache.kylin.common.asyncprofiler.AsyncProfilerTool
import org.apache.spark.SparkContext
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
similarity index 95%
copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
copy to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
index fd4306d3af..a2e4af8554 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, SparkPlugin}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseConstant.scala
similarity index 66%
copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
copy to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseConstant.scala
index fd4306d3af..98c3cbf585 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseConstant.scala
@@ -16,14 +16,23 @@
* limitations under the License.
*/
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.diagnose
-import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin
-import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, SparkPlugin}
+object DiagnoseConstant {
+ // state
+ val STATE_WAIT = "STATE_WAIT"
+ val STATE_COLLECT = "STATE_COLLECT"
-class QueryAsyncProfilerSparkPlugin extends SparkPlugin {
+ // executor message
+ val NEXTCMD = "NEXTCMD"
+ val SENDRESULT = "SENDRESULT"
+ val HDFSDIR = "HDFSDIR"
- override def driverPlugin(): DriverPlugin = new QueryAsyncProfilerDriverPlugin
+ // driver message
+ val NOP = "NOP"
+ val COLLECT = "COLLECT"
+
+ // empty
+ val EMPTY = ""
- override def executorPlugin(): ExecutorPlugin = new AsyncProfilerExecutorPlugin
}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseDriverPlugin.scala
similarity index 51%
rename from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
rename to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseDriverPlugin.scala
index 8af9631561..fed11cd45e 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseDriverPlugin.scala
@@ -16,34 +16,36 @@
* limitations under the License.
*/
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.diagnose
-import org.apache.kylin.common.asyncprofiler.AsyncProfilerTool
-import org.apache.spark.SparkContext
-import org.apache.spark.api.plugin.{DriverPlugin, PluginContext}
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.api.plugin.DriverPlugin
import org.apache.spark.internal.Logging
-import java.util
-
-class QueryAsyncProfilerDriverPlugin extends DriverPlugin with Logging {
-
- override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = {
- // Sparder Driver and KE are always in one JVM, in client mode
- AsyncProfilerTool.loadAsyncProfilerLib(true)
- super.init(sc, pluginContext)
- }
-
+class DiagnoseDriverPlugin extends DriverPlugin with Logging {
override def receive(message: Any): AnyRef = {
- import org.apache.kylin.common.asyncprofiler.Message._
+ message match {
+ case DiagnoseConstant.NEXTCMD =>
+ getNextCommand()
+ case DiagnoseConstant.HDFSDIR =>
+ KylinConfig.getInstanceFromEnv.getHdfsWorkingDirectory
+ case DiagnoseConstant.SENDRESULT =>
+ countDownGcResult()
+ case _ => DiagnoseConstant.EMPTY
+ }
+ }
- val (command, executorId, param) = processMessage(message.toString)
- command match {
- case NEXT_COMMAND =>
- AsyncProfiling.nextCommand()
- case RESULT =>
- AsyncProfiling.cacheExecutorResult(param, executorId)
- ""
- case _ => ""
+ def getNextCommand(): AnyRef = {
+ if (DiagnoseConstant.STATE_COLLECT.equals(DiagnoseHelper.state)) {
+ DiagnoseConstant.COLLECT
+ } else {
+ DiagnoseConstant.NOP
}
}
-}
\ No newline at end of file
+
+ def countDownGcResult(): AnyRef = {
+ DiagnoseHelper.countDownGcResult()
+ DiagnoseConstant.EMPTY
+ }
+
+}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala
new file mode 100644
index 0000000000..c6c811b7c3
--- /dev/null
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.plugin.diagnose
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.kylin.common.util.ExecutorServiceUtil
+import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext}
+import org.apache.spark.internal.Logging
+import org.joda.time.DateTime
+
+import java.io.File
+import java.util
+import java.util.concurrent.{Executors, TimeUnit}
+
+class DiagnoseExecutorPlugin extends ExecutorPlugin with Logging {
+
+ private val SPARDER_LOG: String = "_sparder_logs"
+ private val LOCAL_GC_FILE_PREFIX: String = "gc"
+ private val DATE_PATTERN = "yyyy-MM-dd"
+ private val checkingInterval: Long = 10000L
+ private val configuration: Configuration = new Configuration()
+ private val fileSystem: FileSystem = FileSystem.get(configuration)
+
+ private val scheduledExecutorService = Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Diagnose-%d").build())
+
+ private var state = DiagnoseConstant.STATE_WAIT
+ private var curContainerDir = new File(".")
+ private var sparderLogDir: String = ""
+ private var ctx: PluginContext = _
+
+ override def init(_ctx: PluginContext, extraConf: util.Map[String, String]): Unit = {
+ ctx = _ctx
+ val diagnose = new Runnable {
+ override def run(): Unit = checkAndDiagnose()
+ }
+ logInfo("Diagnose executor plugin is initializing ...")
+ scheduledExecutorService.scheduleWithFixedDelay(
+ diagnose, 0, checkingInterval, TimeUnit.MILLISECONDS)
+ }
+
+ def checkAndDiagnose(): Unit = {
+ try {
+ val replay: AnyRef = ctx.ask(DiagnoseConstant.NEXTCMD)
+ logDebug(s"Executor ${ctx.executorID()} get replay $replay from driver ...")
+ replay match {
+ case DiagnoseConstant.COLLECT =>
+ if (DiagnoseConstant.STATE_WAIT.equals(state)) {
+ state = DiagnoseConstant.STATE_COLLECT
+ logDebug(s"Set executor state to $state")
+ collectGcLog()
+ ctx.send(DiagnoseConstant.SENDRESULT)
+ }
+ case DiagnoseConstant.NOP =>
+ if (!DiagnoseConstant.STATE_WAIT.equals(state)) {
+ state = DiagnoseConstant.STATE_WAIT
+ logDebug(s"Set executor state to $state")
+ }
+ case _ => ""
+ }
+ } catch {
+ case e: Exception =>
+ logInfo("Error while communication/Diagnose", e)
+ }
+ }
+
+ def collectGcLog(): Unit = {
+ logDebug(s"Collectting sparder gc log file ...")
+ if (sparderLogDir.isEmpty) {
+ val reply = ctx.ask(DiagnoseConstant.HDFSDIR).toString
+ if (reply.isEmpty) {
+ logWarning(s"Can not get kylin working dir, will not collect sparder executor gc log.")
+ return
+ } else {
+ sparderLogDir = reply + SPARDER_LOG
+ logInfo(s"HDFS sparder log dir is setting to ${sparderLogDir}")
+ }
+ }
+ val filePath = sparderLogDir + File.separator + new DateTime().toString(DATE_PATTERN) +
+ File.separator + ctx.conf().getAppId + File.separator
+ val fileNamePrefix = "executor-%s-".format(ctx.executorID())
+
+ curContainerDir.listFiles().filter(file => file.getName.startsWith(LOCAL_GC_FILE_PREFIX))
+ .map(file => copyLocalFileToHdfs(new Path(file.getAbsolutePath), new Path(filePath, fileNamePrefix + file.getName)))
+ }
+
+ def copyLocalFileToHdfs(local: Path, hdfs: Path): Unit = {
+ logInfo(s"Local gc file path is: ${local}, target hdfs file is: ${hdfs}")
+ fileSystem.copyFromLocalFile(local, hdfs)
+ }
+
+ override def shutdown(): Unit = {
+ ExecutorServiceUtil.shutdownGracefully(scheduledExecutorService, 3)
+ super.shutdown()
+ }
+
+ // for test only
+ def setCtx(_ctx: PluginContext): Unit = {
+ ctx = _ctx
+ }
+
+ // for test only
+ def setContainerDir(_curContainerDir: File): Unit = {
+ curContainerDir = _curContainerDir
+ }
+}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseHelper.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseHelper.scala
new file mode 100644
index 0000000000..9e0c3d2133
--- /dev/null
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseHelper.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.plugin.diagnose
+
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparderEnv
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+object DiagnoseHelper extends Logging {
+ var state: String = DiagnoseConstant.STATE_WAIT
+ var resultCollectionTimeout: Long = KylinConfig.getInstanceFromEnv.queryDiagnoseCollectionTimeout
+ var gcResult: CountDownLatch = _
+
+ // activeExecutorCount will be set for UT
+ var activeExecutorCount: Int = 0
+
+ def collectSparderExecutorGc(): Unit = {
+ if (!KylinConfig.getInstanceFromEnv.isUTEnv) {
+ activeExecutorCount = SparderEnv.getActiveExecutorIds.size
+ }
+ initGcCount(activeExecutorCount)
+
+ setState(DiagnoseConstant.STATE_COLLECT)
+
+ if (gcResult.await(resultCollectionTimeout, TimeUnit.MILLISECONDS)) {
+ logInfo("All executor gc logs have been uploaded to hdfs")
+ } else {
+ logWarning("Timeout while waiting for gc log result")
+ }
+ setState(DiagnoseConstant.STATE_WAIT)
+ }
+
+ def initGcCount(count: Int): Unit = {
+ gcResult = new CountDownLatch(count)
+ }
+
+ def setState(_state: String): Unit = {
+ DiagnoseHelper.state = _state
+ }
+
+ def countDownGcResult(): Unit = {
+ gcResult.countDown()
+ }
+}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseSparkPlugin.scala
similarity index 72%
rename from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
rename to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseSparkPlugin.scala
index fd4306d3af..52a90e93de 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseSparkPlugin.scala
@@ -16,14 +16,13 @@
* limitations under the License.
*/
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.diagnose
-import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, SparkPlugin}
-class QueryAsyncProfilerSparkPlugin extends SparkPlugin {
+class DiagnoseSparkPlugin extends SparkPlugin {
- override def driverPlugin(): DriverPlugin = new QueryAsyncProfilerDriverPlugin
+ override def driverPlugin(): DriverPlugin = new DiagnoseDriverPlugin
- override def executorPlugin(): ExecutorPlugin = new AsyncProfilerExecutorPlugin
+ override def executorPlugin(): ExecutorPlugin = new DiagnoseExecutorPlugin
}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
index 1f6ad0ebba..2ebf1d4826 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
@@ -18,28 +18,28 @@
package org.apache.spark.sql
-import java.io._
-import java.net.URI
-import java.nio.file.Paths
-
-import scala.collection.JavaConverters._
-
import org.apache.hadoop.fs.Path
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.kylin.common.{KapConfig, KylinConfig}
import org.apache.kylin.common.util.{HadoopUtil, Unsafe}
+import org.apache.kylin.common.{KapConfig, KylinConfig}
import org.apache.kylin.metadata.query.BigQueryThresholdUpdater
+import org.apache.kylin.query.plugin.asyncprofiler.QueryAsyncProfilerSparkPlugin
+import org.apache.kylin.query.plugin.diagnose.DiagnoseSparkPlugin
import org.apache.kylin.query.util.ExtractFactory
-import org.springframework.expression.common.TemplateParserContext
-import org.springframework.expression.spel.standard.SpelExpressionParser
-
-import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.SparkSession.Builder
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf, StaticSQLConf}
+import org.apache.spark.sql.internal.{SQLConf, SessionState, SharedState, StaticSQLConf}
import org.apache.spark.sql.udf.UdfManager
import org.apache.spark.util.{KylinReflectUtils, Utils}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.springframework.expression.common.TemplateParserContext
+import org.springframework.expression.spel.standard.SpelExpressionParser
+
+import java.io._
+import java.net.URI
+import java.nio.file.Paths
+import scala.collection.JavaConverters._
class KylinSession(
@transient val sc: SparkContext,
@@ -112,6 +112,7 @@ class KylinSession(
object KylinSession extends Logging {
val NORMAL_FAIR_SCHEDULER_FILE_NAME: String = "/fairscheduler.xml"
val QUERY_LIMIT_FAIR_SCHEDULER_FILE_NAME: String = "/query-limit-fair-scheduler.xml"
+ val SPARK_PLUGINS_KEY = "spark.plugins"
implicit class KylinBuilder(builder: Builder) {
var queryCluster: Boolean = true
@@ -328,16 +329,30 @@ object KylinSession extends Logging {
}
}
+ checkAndSetSparkPlugins(sparkConf)
+
+ sparkConf
+ }
+
+ def checkAndSetSparkPlugins(sparkConf: SparkConf): Unit = {
+ // Sparder diagnose plugin
+ if (kapConfig.getKylinConfig.queryDiagnoseEnable()) {
+ addSparkPlugin(sparkConf, classOf[DiagnoseSparkPlugin].getCanonicalName)
+ }
+
+ // Query profile plugin
if (kapConfig.getKylinConfig.asyncProfilingEnabled()) {
- val plugins = sparkConf.get("spark.plugins", "")
- if (plugins.isEmpty) {
- sparkConf.set("spark.plugins", "org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin")
- } else {
- sparkConf.set("spark.plugins", "org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin," + plugins)
- }
+ addSparkPlugin(sparkConf, classOf[QueryAsyncProfilerSparkPlugin].getCanonicalName)
}
+ }
- sparkConf
+ def addSparkPlugin(sparkConf: SparkConf, pluginName: String): Unit = {
+ val plugins = sparkConf.get(SPARK_PLUGINS_KEY, "")
+ if (plugins.isEmpty) {
+ sparkConf.set(SPARK_PLUGINS_KEY, pluginName)
+ } else {
+ sparkConf.set(SPARK_PLUGINS_KEY, pluginName + "," + plugins)
+ }
}
def buildCluster(): KylinBuilder = {
@@ -411,7 +426,8 @@ object KylinSession extends Logging {
val parser = new SpelExpressionParser()
val parserCtx = new TemplateParserContext()
while ( {
- templateLine = fileReader.readLine(); templateLine != null
+ templateLine = fileReader.readLine();
+ templateLine != null
}) {
processedLine = parser.parseExpression(templateLine, parserCtx).getValue(params, classOf[String]) + "\r\n"
fileWriter.write(processedLine)
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
index 59ce599fce..cfb469d547 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
@@ -346,5 +346,8 @@ object SparderEnv extends Logging {
configuration
}
-
+ // Return the list of currently active executors
+ def getActiveExecutorIds(): Seq[String] = {
+ getSparkSession.sparkContext.getExecutorIds()
+ }
}
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/SparkPluginWithMeta.scala
similarity index 94%
rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala
rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/SparkPluginWithMeta.scala
index 9db5d72000..064fb4516f 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/SparkPluginWithMeta.scala
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin
import org.apache.kylin.common.util.NLocalFileMetadataTestCase
import org.apache.spark.{SparkContext, SparkFunSuite}
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterAll
import java.io.File
-trait AsyncPluginWithMeta extends SparkFunSuite with BeforeAndAfterAll {
+trait SparkPluginWithMeta extends SparkFunSuite with BeforeAndAfterAll {
@transient var sc: SparkContext = _
protected val ut_meta = "../examples/test_case_data/localmeta"
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfilingTest.scala
similarity index 94%
rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala
rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfilingTest.scala
index 33f497ee33..81188e5aef 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfilingTest.scala
@@ -16,9 +16,10 @@
* limitations under the License.
*/
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.query.plugin.SparkPluginWithMeta
import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.{SparkConf, SparkContext}
@@ -26,7 +27,7 @@ import org.mockito.Mockito.mock
import java.io.{File, OutputStream}
-class AsyncProfilingTest extends AsyncPluginWithMeta {
+class AsyncProfilingTest extends SparkPluginWithMeta {
val sparkPluginName: String = classOf[BuildAsyncProfilerSparkPlugin].getName
val flagFileDir: String = System.getProperty("java.io.tmpdir") + "default/jobStepId/"
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala
similarity index 96%
rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala
rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala
index 1ecbbe6a23..d0e21562cf 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin
import org.junit.Assert
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryProfilerDriverPluginTest.scala
similarity index 92%
rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala
rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryProfilerDriverPluginTest.scala
index 44c564b3ba..74db3cc3a1 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryProfilerDriverPluginTest.scala
@@ -16,13 +16,14 @@
* limitations under the License.
*/
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
+import org.apache.kylin.query.plugin.SparkPluginWithMeta
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Assert
-class QueryAsyncProfilerDriverPluginTest extends AsyncPluginWithMeta {
+class QueryAsyncProfilerDriverPluginTest extends SparkPluginWithMeta {
val sparkPluginName: String = classOf[QueryAsyncProfilerSparkPlugin].getName
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPluginTest.scala
new file mode 100644
index 0000000000..c725c3f83b
--- /dev/null
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPluginTest.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.plugin.diagnose
+
+import com.codahale.metrics.MetricRegistry
+import org.apache.kylin.query.plugin.SparkPluginWithMeta
+import org.apache.spark.api.plugin.PluginContext
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.Assert
+
+import java.nio.file.Files
+import java.util
+
+class DiagnoseExecutorPluginTest extends SparkPluginWithMeta {
+
+ val sparkPluginName: String = classOf[DiagnoseSparkPlugin].getName
+ val executorPluginTest = new DiagnoseExecutorPlugin()
+ val mockPluginCtx = new MockPluginCtx
+ val tempContainerDir = Files.createTempDirectory("PluginContainerTest")
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ val conf = new SparkConf()
+ .setAppName(getClass.getName)
+ .set(SparkLauncher.SPARK_MASTER, "local[1]")
+ .set("spark.plugins", sparkPluginName)
+ sc = new SparkContext(conf)
+
+ mockPluginCtx.sparkConf = sc.getConf
+ mockPluginCtx.hdfsDir = tempContainerDir.toString
+ mockPluginCtx.mockId = "mockId"
+
+ executorPluginTest.setCtx(mockPluginCtx)
+ executorPluginTest.setContainerDir(tempContainerDir.toFile)
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ mockPluginCtx.clear()
+ }
+
+
+ test("Test executor plugin") {
+ val filePath = Files.createTempFile(tempContainerDir, "gc", "log")
+ Assert.assertTrue(filePath.toFile.exists())
+
+ mockPluginCtx.message = DiagnoseConstant.COLLECT
+ executorPluginTest.checkAndDiagnose()
+
+ mockPluginCtx.message = DiagnoseConstant.NOP
+ executorPluginTest.checkAndDiagnose()
+
+ mockPluginCtx.message = DiagnoseConstant.EMPTY
+ executorPluginTest.checkAndDiagnose()
+ }
+}
+
+class MockPluginCtx() extends PluginContext {
+ var message: String = _
+ var mockId: String = _
+ var hdfsDir: String = _
+ var sparkConf: SparkConf = _
+
+ override def ask(input: Any): String = {
+ if (DiagnoseConstant.HDFSDIR.equals(input)) {
+ hdfsDir
+ } else {
+ message
+ }
+ }
+
+ override def executorID(): String = mockId
+
+ override def conf(): SparkConf = sparkConf
+
+ override def metricRegistry(): MetricRegistry = null
+
+ override def hostname(): String = "MockHostname"
+
+ override def resources(): util.Map[String, ResourceInformation] = null
+
+ override def send(message: Any): Unit = {}
+
+ def clear(): Unit = {
+ message = null
+ mockId = null
+ hdfsDir = null
+ sparkConf = null
+ }
+
+}
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnosePluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnosePluginTest.scala
new file mode 100644
index 0000000000..75df31685e
--- /dev/null
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnosePluginTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.plugin.diagnose
+
+import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.query.plugin.SparkPluginWithMeta
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.Assert
+
+class DiagnosePluginTest extends SparkPluginWithMeta {
+
+ val sparkPluginName: String = classOf[DiagnoseSparkPlugin].getName
+ val diagPlugin = new DiagnoseSparkPlugin
+ val driverPluginTest = new DiagnoseDriverPlugin()
+ val executorPluginTest = new DiagnoseExecutorPlugin()
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ val conf = new SparkConf()
+ .setAppName(getClass.getName)
+ .set(SparkLauncher.SPARK_MASTER, "local[1]")
+ .set("spark.plugins", sparkPluginName)
+ sc = new SparkContext(conf)
+ }
+
+
+ test("Test trigger gc collection") {
+ // get gc success
+ DiagnoseHelper.collectSparderExecutorGc()
+ Assert.assertEquals(DiagnoseConstant.STATE_WAIT, DiagnoseHelper.state)
+
+ // get gc failed
+ DiagnoseHelper.resultCollectionTimeout = 100L
+ DiagnoseHelper.activeExecutorCount = 1
+ DiagnoseHelper.collectSparderExecutorGc()
+ Assert.assertEquals(DiagnoseConstant.STATE_WAIT, DiagnoseHelper.state)
+ }
+
+
+ test("Test driver plugin") {
+ // NEXTCMD
+ DiagnoseHelper.setState(DiagnoseConstant.STATE_WAIT)
+ var reply = driverPluginTest.receive(DiagnoseConstant.NEXTCMD)
+ Assert.assertEquals(DiagnoseConstant.NOP, reply)
+
+ DiagnoseHelper.setState(DiagnoseConstant.STATE_COLLECT)
+ reply = driverPluginTest.receive(DiagnoseConstant.NEXTCMD)
+ Assert.assertEquals(DiagnoseConstant.COLLECT, reply)
+
+ // SENDRESULT
+ DiagnoseHelper.initGcCount(2)
+ reply = driverPluginTest.receive(DiagnoseConstant.SENDRESULT)
+ Assert.assertEquals(DiagnoseConstant.EMPTY, reply)
+ Assert.assertEquals(1, DiagnoseHelper.gcResult.getCount)
+
+ // HDFSDIR
+ reply = driverPluginTest.receive(DiagnoseConstant.HDFSDIR)
+ Assert.assertEquals(KylinConfig.getInstanceFromEnv.getHdfsWorkingDirectory, reply)
+
+ // Other
+ reply = driverPluginTest.receive("Other")
+ Assert.assertEquals(DiagnoseConstant.EMPTY, reply)
+ }
+
+}