You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/05/06 06:59:14 UTC

[kylin] 13/38: KYLIN-5528 support collect sparder gc

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 842a0601a643ac83d894ec8a248673c8fd13cfb5
Author: Yaguang Jia <ji...@foxmail.com>
AuthorDate: Thu Feb 23 18:00:57 2023 +0800

    KYLIN-5528 support collect sparder gc
    
    * KYLIN-5528 support collect sparder gc
    
    * KYLIN-5528 add gc option to executor extraJavaOption
    
    * KYLIN-5528 change timeout to 30s
---
 .../src/main/resources/config/init.properties      |   2 +-
 .../apache/kylin/rest/service/SystemService.java   |   5 +
 .../org/apache/kylin/common/KylinConfigBase.java   |  14 +++
 .../src/main/resources/kylin-defaults0.properties  |   2 +-
 .../src/main/resources/config/init.properties      |   2 +-
 .../src/main/resources/config/init.properties      |   2 +-
 .../kylin/rest/controller/NQueryController.java    |   2 +-
 .../src/main/resources/config/init.properties      |   2 +-
 .../engine/spark/job/NSparkExecutableTest.java     |  15 ++-
 .../asyncprofiler/AsyncProfiling.scala             |   2 +-
 .../QueryAsyncProfilerDriverPlugin.scala           |   2 +-
 .../QueryAsyncProfilerSparkPlugin.scala            |   2 +-
 .../diagnose/DiagnoseConstant.scala}               |  21 +++-
 .../diagnose/DiagnoseDriverPlugin.scala}           |  50 +++++----
 .../plugin/diagnose/DiagnoseExecutorPlugin.scala   | 124 +++++++++++++++++++++
 .../query/plugin/diagnose/DiagnoseHelper.scala     |  62 +++++++++++
 .../diagnose/DiagnoseSparkPlugin.scala}            |   9 +-
 .../scala/org/apache/spark/sql/KylinSession.scala  |  56 ++++++----
 .../scala/org/apache/spark/sql/SparderEnv.scala    |   5 +-
 .../SparkPluginWithMeta.scala}                     |   4 +-
 .../asyncprofiler/AsyncProfilingTest.scala         |   5 +-
 .../QueryAsyncProfilerSparkPluginTest.scala        |   2 +-
 .../QueryProfilerDriverPluginTest.scala}           |   5 +-
 .../diagnose/DiagnoseExecutorPluginTest.scala      | 109 ++++++++++++++++++
 .../query/plugin/diagnose/DiagnosePluginTest.scala |  82 ++++++++++++++
 25 files changed, 506 insertions(+), 80 deletions(-)

diff --git a/src/common-booter/src/main/resources/config/init.properties b/src/common-booter/src/main/resources/config/init.properties
index 5dbf475978..3ff1076256 100644
--- a/src/common-booter/src/main/resources/config/init.properties
+++ b/src/common-booter/src/main/resources/config/init.properties
@@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
 
 # for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here
 
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...]
 kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml
 kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java b/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java
index fa467e414d..25c961be8c 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/service/SystemService.java
@@ -60,6 +60,7 @@ import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.query.plugin.diagnose.DiagnoseHelper;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.BackupRequest;
 import org.apache.kylin.rest.request.DiagProgressRequest;
@@ -138,6 +139,8 @@ public class SystemService extends BasicService {
         String[] arguments;
         // full
         if (StringUtils.isEmpty(jobId) && StringUtils.isEmpty(queryId)) {
+            // Sparder executor gc log should be collected before FULL and QUERY diag package
+            DiagnoseHelper.collectSparderExecutorGc();
             if (startTime == null && endTime == null) {
                 startTime = Long.toString(System.currentTimeMillis() - 259200000L);
                 endTime = Long.toString(System.currentTimeMillis());
@@ -153,6 +156,8 @@ public class SystemService extends BasicService {
             arguments = new String[] { jobOpt, jobId, "-destDir", exportFile.getAbsolutePath(), "-diagId", uuid };
             diagPackageType = JOB;
         } else { //query
+            // Sparder executor gc log should be collected before FULL and QUERY diag package
+            DiagnoseHelper.collectSparderExecutorGc();
             arguments = new String[] { "-project", project, "-query", queryId, "-destDir", exportFile.getAbsolutePath(),
                     "-diagId", uuid };
             diagPackageType = QUERY;
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 32af04760a..0490ce7b84 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2565,6 +2565,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.job.notification-on-empty-data-load", FALSE));
     }
 
+    public boolean getJobErrorNotificationEnabled() {
+        return Boolean.parseBoolean(getOptional("kylin.job.notification-on-job-error", FALSE));
+    }
+
     public Long getStorageResourceSurvivalTimeThreshold() {
         return TimeUtil.timeStringAs(this.getOptional("kylin.storage.resource-survival-time-threshold", "7d"),
                 TimeUnit.MILLISECONDS);
@@ -3812,6 +3816,16 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.build.allow-non-strict-count-check", FALSE));
     }
 
+    public long queryDiagnoseCollectionTimeout() {
+        return TimeUtil.timeStringAs(getOptional("kylin.query.diagnose-collection-timeout", "30s"),
+                TimeUnit.MILLISECONDS);
+    }
+
+    public boolean queryDiagnoseEnable() {
+        return !Boolean.parseBoolean(System.getProperty("spark.local", FALSE))
+                && Boolean.parseBoolean(getOptional("kylin.query.diagnose-enabled", TRUE));
+    }
+
     // ============================================================================
     // Cost based index Planner
     // ============================================================================
diff --git a/src/core-common/src/main/resources/kylin-defaults0.properties b/src/core-common/src/main/resources/kylin-defaults0.properties
index cbacd5d78a..9aee2acdda 100644
--- a/src/core-common/src/main/resources/kylin-defaults0.properties
+++ b/src/core-common/src/main/resources/kylin-defaults0.properties
@@ -115,7 +115,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
 
 # for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here
 
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...]
 kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml
 kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
diff --git a/src/data-loading-booter/src/main/resources/config/init.properties b/src/data-loading-booter/src/main/resources/config/init.properties
index 5dbf475978..3ff1076256 100644
--- a/src/data-loading-booter/src/main/resources/config/init.properties
+++ b/src/data-loading-booter/src/main/resources/config/init.properties
@@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
 
 # for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here
 
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...]
 kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml
 kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
diff --git a/src/query-booter/src/main/resources/config/init.properties b/src/query-booter/src/main/resources/config/init.properties
index 5dbf475978..3ff1076256 100644
--- a/src/query-booter/src/main/resources/config/init.properties
+++ b/src/query-booter/src/main/resources/config/init.properties
@@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
 
 # for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here
 
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...]
 kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml
 kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
diff --git a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java
index ea088d38a8..21936b8faf 100644
--- a/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java
+++ b/src/query-server/src/main/java/org/apache/kylin/rest/controller/NQueryController.java
@@ -71,7 +71,7 @@ import org.apache.kylin.common.persistence.transaction.StopQueryBroadcastEventNo
 import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.metadata.query.QueryHistoryRequest;
 import org.apache.kylin.metadata.query.util.QueryHisTransformStandardUtil;
-import org.apache.kylin.query.asyncprofiler.AsyncProfiling;
+import org.apache.kylin.query.plugin.asyncprofiler.AsyncProfiling;
 import org.apache.kylin.rest.cluster.ClusterManager;
 import org.apache.kylin.rest.request.SQLFormatRequest;
 import org.apache.kylin.rest.response.QueryStatisticsResponse;
diff --git a/src/server/src/main/resources/config/init.properties b/src/server/src/main/resources/config/init.properties
index 5dbf475978..3ff1076256 100644
--- a/src/server/src/main/resources/config/init.properties
+++ b/src/server/src/main/resources/config/init.properties
@@ -113,7 +113,7 @@ kylin.storage.columnar.spark-env.HADOOP_CONF_DIR=${kylin_hadoop_conf_dir}
 
 # for any spark config entry in http://spark.apache.org/docs/latest/configuration.html#spark-properties, prefix it with "kap.storage.columnar.spark-conf" and append here
 
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-executor-log4j.xml -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkap.metadata.identifier=${kylin.metadata.url.identifier} -Dkap.spark.category=sparder -Dkap.spark.project=${job.project} -Dkap.spark.mountDir=${kylin.tool.mount-spark-log-dir} -XX:MaxDirectMemorySize=896M -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTime [...]
 kylin.storage.columnar.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current -Dlog4j.configurationFile=spark-appmaster-log4j.xml
 kylin.storage.columnar.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.storage.columnar.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializer
diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
index ce5a26c72d..2fcf39b87d 100644
--- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
+++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkExecutableTest.java
@@ -29,6 +29,8 @@ import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin;
+import org.apache.kylin.query.plugin.asyncprofiler.QueryAsyncProfilerSparkPlugin;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -152,12 +154,11 @@ public class NSparkExecutableTest extends NLocalFileMetadataTestCase {
             String cmd = (String) sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc);
 
             Assert.assertNotNull(cmd);
-            Assert.assertTrue(
-                    cmd.contains("spark.plugins=org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin"));
+            Assert.assertTrue(cmd.contains("spark.plugins=" + BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
         }
 
         overwriteSystemProp("kylin.engine.spark-conf.spark.plugins",
-                "org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin");
+                QueryAsyncProfilerSparkPlugin.class.getCanonicalName());
         {
             val desc = sparkExecutable.getSparkAppDesc();
             desc.setHadoopConfDir(hadoopConf);
@@ -166,9 +167,8 @@ public class NSparkExecutableTest extends NLocalFileMetadataTestCase {
             String cmd = (String) sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc);
 
             Assert.assertNotNull(cmd);
-            Assert.assertTrue(
-                    cmd.contains("spark.plugins=org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin,"
-                            + "org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin"));
+            Assert.assertTrue(cmd.contains("spark.plugins=" + QueryAsyncProfilerSparkPlugin.class.getCanonicalName()
+                    + "," + BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
         }
 
         overwriteSystemProp("kylin.engine.async-profiler-enabled", "false");
@@ -180,8 +180,7 @@ public class NSparkExecutableTest extends NLocalFileMetadataTestCase {
             String cmd = (String) sparkExecutable.sparkJobHandler.generateSparkCmd(kylinConfig, desc);
 
             Assert.assertNotNull(cmd);
-            Assert.assertFalse(
-                    cmd.contains("spark.plugins=org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin"));
+            Assert.assertFalse(cmd.contains("spark.plugins=" + BuildAsyncProfilerSparkPlugin.class.getCanonicalName()));
         }
 
         overwriteSystemProp("kylin.engine.spark-conf.spark.driver.extraJavaOptions",
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfiling.scala
similarity index 98%
rename from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala
rename to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfiling.scala
index d27cfc76f5..8147ba94ec 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfiling.scala
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
 
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.common.asyncprofiler.Message._
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
similarity index 97%
copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
copy to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
index 8af9631561..f8fe0e38a6 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
 
 import org.apache.kylin.common.asyncprofiler.AsyncProfilerTool
 import org.apache.spark.SparkContext
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
similarity index 95%
copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
copy to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
index fd4306d3af..a2e4af8554 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
 
 import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin
 import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, SparkPlugin}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseConstant.scala
similarity index 66%
copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
copy to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseConstant.scala
index fd4306d3af..98c3cbf585 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseConstant.scala
@@ -16,14 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.diagnose
 
-import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin
-import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, SparkPlugin}
+object DiagnoseConstant {
+  // state
+  val STATE_WAIT = "STATE_WAIT"
+  val STATE_COLLECT = "STATE_COLLECT"
 
-class QueryAsyncProfilerSparkPlugin extends SparkPlugin {
+  // executor message
+  val NEXTCMD = "NEXTCMD"
+  val SENDRESULT = "SENDRESULT"
+  val HDFSDIR = "HDFSDIR"
 
-  override def driverPlugin(): DriverPlugin = new QueryAsyncProfilerDriverPlugin
+  // driver message
+  val NOP = "NOP"
+  val COLLECT = "COLLECT"
+
+  // empty
+  val EMPTY = ""
 
-  override def executorPlugin(): ExecutorPlugin = new AsyncProfilerExecutorPlugin
 }
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseDriverPlugin.scala
similarity index 51%
rename from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
rename to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseDriverPlugin.scala
index 8af9631561..fed11cd45e 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseDriverPlugin.scala
@@ -16,34 +16,36 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.diagnose
 
-import org.apache.kylin.common.asyncprofiler.AsyncProfilerTool
-import org.apache.spark.SparkContext
-import org.apache.spark.api.plugin.{DriverPlugin, PluginContext}
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.api.plugin.DriverPlugin
 import org.apache.spark.internal.Logging
 
-import java.util
-
-class QueryAsyncProfilerDriverPlugin extends DriverPlugin with Logging {
-
-  override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = {
-    // Sparder Driver and KE are always in one JVM, in client mode
-    AsyncProfilerTool.loadAsyncProfilerLib(true)
-    super.init(sc, pluginContext)
-  }
-
+class DiagnoseDriverPlugin extends DriverPlugin with Logging {
   override def receive(message: Any): AnyRef = {
-    import org.apache.kylin.common.asyncprofiler.Message._
+    message match {
+      case DiagnoseConstant.NEXTCMD =>
+        getNextCommand()
+      case DiagnoseConstant.HDFSDIR =>
+        KylinConfig.getInstanceFromEnv.getHdfsWorkingDirectory
+      case DiagnoseConstant.SENDRESULT =>
+        countDownGcResult()
+      case _ => DiagnoseConstant.EMPTY
+    }
+  }
 
-    val (command, executorId, param) = processMessage(message.toString)
-    command match {
-      case NEXT_COMMAND =>
-        AsyncProfiling.nextCommand()
-      case RESULT =>
-        AsyncProfiling.cacheExecutorResult(param, executorId)
-        ""
-      case _ => ""
+  def getNextCommand(): AnyRef = {
+    if (DiagnoseConstant.STATE_COLLECT.equals(DiagnoseHelper.state)) {
+      DiagnoseConstant.COLLECT
+    } else {
+      DiagnoseConstant.NOP
     }
   }
-}
\ No newline at end of file
+
+  def countDownGcResult(): AnyRef = {
+    DiagnoseHelper.countDownGcResult()
+    DiagnoseConstant.EMPTY
+  }
+
+}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala
new file mode 100644
index 0000000000..c6c811b7c3
--- /dev/null
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPlugin.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.plugin.diagnose
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.kylin.common.util.ExecutorServiceUtil
+import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext}
+import org.apache.spark.internal.Logging
+import org.joda.time.DateTime
+
+import java.io.File
+import java.util
+import java.util.concurrent.{Executors, TimeUnit}
+
+class DiagnoseExecutorPlugin extends ExecutorPlugin with Logging {
+
+  private val SPARDER_LOG: String = "_sparder_logs"
+  private val LOCAL_GC_FILE_PREFIX: String = "gc"
+  private val DATE_PATTERN = "yyyy-MM-dd"
+  private val checkingInterval: Long = 10000L
+  private val configuration: Configuration = new Configuration()
+  private val fileSystem: FileSystem = FileSystem.get(configuration)
+
+  private val scheduledExecutorService = Executors.newScheduledThreadPool(1,
+    new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Diagnose-%d").build())
+
+  private var state = DiagnoseConstant.STATE_WAIT
+  private var curContainerDir = new File(".")
+  private var sparderLogDir: String = ""
+  private var ctx: PluginContext = _
+
+  override def init(_ctx: PluginContext, extraConf: util.Map[String, String]): Unit = {
+    ctx = _ctx
+    val diagnose = new Runnable {
+      override def run(): Unit = checkAndDiagnose()
+    }
+    logInfo("Diagnose executor plugin is initializing ...")
+    scheduledExecutorService.scheduleWithFixedDelay(
+      diagnose, 0, checkingInterval, TimeUnit.MILLISECONDS)
+  }
+
+  def checkAndDiagnose(): Unit = {
+    try {
+      val replay: AnyRef = ctx.ask(DiagnoseConstant.NEXTCMD)
+      logDebug(s"Executor ${ctx.executorID()} get replay $replay from driver ...")
+      replay match {
+        case DiagnoseConstant.COLLECT =>
+          if (DiagnoseConstant.STATE_WAIT.equals(state)) {
+            state = DiagnoseConstant.STATE_COLLECT
+            logDebug(s"Set executor state to $state")
+            collectGcLog()
+            ctx.send(DiagnoseConstant.SENDRESULT)
+          }
+        case DiagnoseConstant.NOP =>
+          if (!DiagnoseConstant.STATE_WAIT.equals(state)) {
+            state = DiagnoseConstant.STATE_WAIT
+            logDebug(s"Set executor state to $state")
+          }
+        case _ => ""
+      }
+    } catch {
+      case e: Exception =>
+        logInfo("Error while communication/Diagnose", e)
+    }
+  }
+
+  def collectGcLog(): Unit = {
+    logDebug(s"Collectting sparder gc log file ...")
+    if (sparderLogDir.isEmpty) {
+      val reply = ctx.ask(DiagnoseConstant.HDFSDIR).toString
+      if (reply.isEmpty) {
+        logWarning(s"Can not get kylin working dir, will not collect sparder executor gc log.")
+        return
+      } else {
+        sparderLogDir = reply + SPARDER_LOG
+        logInfo(s"HDFS sparder log dir is setting to ${sparderLogDir}")
+      }
+    }
+    val filePath = sparderLogDir + File.separator + new DateTime().toString(DATE_PATTERN) +
+      File.separator + ctx.conf().getAppId + File.separator
+    val fileNamePrefix = "executor-%s-".format(ctx.executorID())
+
+    curContainerDir.listFiles().filter(file => file.getName.startsWith(LOCAL_GC_FILE_PREFIX))
+      .map(file => copyLocalFileToHdfs(new Path(file.getAbsolutePath), new Path(filePath, fileNamePrefix + file.getName)))
+  }
+
+  def copyLocalFileToHdfs(local: Path, hdfs: Path): Unit = {
+    logInfo(s"Local gc file path is: ${local}, target hdfs file is: ${hdfs}")
+    fileSystem.copyFromLocalFile(local, hdfs)
+  }
+
+  override def shutdown(): Unit = {
+    ExecutorServiceUtil.shutdownGracefully(scheduledExecutorService, 3)
+    super.shutdown()
+  }
+
+  // for test only
+  def setCtx(_ctx: PluginContext): Unit = {
+    ctx = _ctx
+  }
+
+  // for test only
+  def setContainerDir(_curContainerDir: File): Unit = {
+    curContainerDir = _curContainerDir
+  }
+}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseHelper.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseHelper.scala
new file mode 100644
index 0000000000..9e0c3d2133
--- /dev/null
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseHelper.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.plugin.diagnose
+
+import org.apache.kylin.common.KylinConfig
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparderEnv
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+object DiagnoseHelper extends Logging {
+  var state: String = DiagnoseConstant.STATE_WAIT
+  var resultCollectionTimeout: Long = KylinConfig.getInstanceFromEnv.queryDiagnoseCollectionTimeout
+  var gcResult: CountDownLatch = _
+
+  // activeExecutorCount will be set for UT
+  var activeExecutorCount: Int = 0
+
+  def collectSparderExecutorGc(): Unit = {
+    if (!KylinConfig.getInstanceFromEnv.isUTEnv) {
+      activeExecutorCount = SparderEnv.getActiveExecutorIds.size
+    }
+    initGcCount(activeExecutorCount)
+
+    setState(DiagnoseConstant.STATE_COLLECT)
+
+    if (gcResult.await(resultCollectionTimeout, TimeUnit.MILLISECONDS)) {
+      logInfo("All executor gc logs have been uploaded to hdfs")
+    } else {
+      logWarning("Timeout while waiting for gc log result")
+    }
+    setState(DiagnoseConstant.STATE_WAIT)
+  }
+
+  def initGcCount(count: Int): Unit = {
+    gcResult = new CountDownLatch(count)
+  }
+
+  def setState(_state: String): Unit = {
+    DiagnoseHelper.state = _state
+  }
+
+  def countDownGcResult(): Unit = {
+    gcResult.countDown()
+  }
+}
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseSparkPlugin.scala
similarity index 72%
rename from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
rename to src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseSparkPlugin.scala
index fd4306d3af..52a90e93de 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseSparkPlugin.scala
@@ -16,14 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.diagnose
 
-import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin
 import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, SparkPlugin}
 
-class QueryAsyncProfilerSparkPlugin extends SparkPlugin {
+class DiagnoseSparkPlugin extends SparkPlugin {
 
-  override def driverPlugin(): DriverPlugin = new QueryAsyncProfilerDriverPlugin
+  override def driverPlugin(): DriverPlugin = new DiagnoseDriverPlugin
 
-  override def executorPlugin(): ExecutorPlugin = new AsyncProfilerExecutorPlugin
+  override def executorPlugin(): ExecutorPlugin = new DiagnoseExecutorPlugin
 }
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
index 1f6ad0ebba..2ebf1d4826 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala
@@ -18,28 +18,28 @@
 
 package org.apache.spark.sql
 
-import java.io._
-import java.net.URI
-import java.nio.file.Paths
-
-import scala.collection.JavaConverters._
-
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.kylin.common.{KapConfig, KylinConfig}
 import org.apache.kylin.common.util.{HadoopUtil, Unsafe}
+import org.apache.kylin.common.{KapConfig, KylinConfig}
 import org.apache.kylin.metadata.query.BigQueryThresholdUpdater
+import org.apache.kylin.query.plugin.asyncprofiler.QueryAsyncProfilerSparkPlugin
+import org.apache.kylin.query.plugin.diagnose.DiagnoseSparkPlugin
 import org.apache.kylin.query.util.ExtractFactory
-import org.springframework.expression.common.TemplateParserContext
-import org.springframework.expression.spel.standard.SpelExpressionParser
-
-import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.SparkSession.Builder
-import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf, StaticSQLConf}
+import org.apache.spark.sql.internal.{SQLConf, SessionState, SharedState, StaticSQLConf}
 import org.apache.spark.sql.udf.UdfManager
 import org.apache.spark.util.{KylinReflectUtils, Utils}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.springframework.expression.common.TemplateParserContext
+import org.springframework.expression.spel.standard.SpelExpressionParser
+
+import java.io._
+import java.net.URI
+import java.nio.file.Paths
+import scala.collection.JavaConverters._
 
 class KylinSession(
                     @transient val sc: SparkContext,
@@ -112,6 +112,7 @@ class KylinSession(
 object KylinSession extends Logging {
   val NORMAL_FAIR_SCHEDULER_FILE_NAME: String = "/fairscheduler.xml"
   val QUERY_LIMIT_FAIR_SCHEDULER_FILE_NAME: String = "/query-limit-fair-scheduler.xml"
+  val SPARK_PLUGINS_KEY = "spark.plugins"
 
   implicit class KylinBuilder(builder: Builder) {
     var queryCluster: Boolean = true
@@ -328,16 +329,30 @@ object KylinSession extends Logging {
         }
       }
 
+      checkAndSetSparkPlugins(sparkConf)
+
+      sparkConf
+    }
+
+    def checkAndSetSparkPlugins(sparkConf: SparkConf): Unit = {
+      // Sparder diagnose plugin
+      if (kapConfig.getKylinConfig.queryDiagnoseEnable()) {
+        addSparkPlugin(sparkConf, classOf[DiagnoseSparkPlugin].getCanonicalName)
+      }
+
+      // Query profile plugin
       if (kapConfig.getKylinConfig.asyncProfilingEnabled()) {
-        val plugins = sparkConf.get("spark.plugins", "")
-        if (plugins.isEmpty) {
-          sparkConf.set("spark.plugins", "org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin")
-        } else {
-          sparkConf.set("spark.plugins", "org.apache.kylin.query.asyncprofiler.QueryAsyncProfilerSparkPlugin," + plugins)
-        }
+        addSparkPlugin(sparkConf, classOf[QueryAsyncProfilerSparkPlugin].getCanonicalName)
       }
+    }
 
-      sparkConf
+    def addSparkPlugin(sparkConf: SparkConf, pluginName: String): Unit = {
+      val plugins = sparkConf.get(SPARK_PLUGINS_KEY, "")
+      if (plugins.isEmpty) {
+        sparkConf.set(SPARK_PLUGINS_KEY, pluginName)
+      } else {
+        sparkConf.set(SPARK_PLUGINS_KEY, pluginName + "," + plugins)
+      }
     }
 
     def buildCluster(): KylinBuilder = {
@@ -411,7 +426,8 @@ object KylinSession extends Logging {
     val parser = new SpelExpressionParser()
     val parserCtx = new TemplateParserContext()
     while ( {
-      templateLine = fileReader.readLine(); templateLine != null
+      templateLine = fileReader.readLine();
+      templateLine != null
     }) {
       processedLine = parser.parseExpression(templateLine, parserCtx).getValue(params, classOf[String]) + "\r\n"
       fileWriter.write(processedLine)
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
index 59ce599fce..cfb469d547 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
@@ -346,5 +346,8 @@ object SparderEnv extends Logging {
     configuration
   }
 
-
+  // Return the list of currently active executors
+  def getActiveExecutorIds(): Seq[String] = {
+    getSparkSession.sparkContext.getExecutorIds()
+  }
 }
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/SparkPluginWithMeta.scala
similarity index 94%
rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala
rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/SparkPluginWithMeta.scala
index 9db5d72000..064fb4516f 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/SparkPluginWithMeta.scala
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin
 
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase
 import org.apache.spark.{SparkContext, SparkFunSuite}
@@ -24,7 +24,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import java.io.File
 
-trait AsyncPluginWithMeta extends SparkFunSuite with BeforeAndAfterAll {
+trait SparkPluginWithMeta extends SparkFunSuite with BeforeAndAfterAll {
 
   @transient var sc: SparkContext = _
   protected val ut_meta = "../examples/test_case_data/localmeta"
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfilingTest.scala
similarity index 94%
rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala
rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfilingTest.scala
index 33f497ee33..81188e5aef 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/AsyncProfilingTest.scala
@@ -16,9 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
 
 import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.query.plugin.SparkPluginWithMeta
 import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.{SparkConf, SparkContext}
@@ -26,7 +27,7 @@ import org.mockito.Mockito.mock
 
 import java.io.{File, OutputStream}
 
-class AsyncProfilingTest extends AsyncPluginWithMeta {
+class AsyncProfilingTest extends SparkPluginWithMeta {
 
   val sparkPluginName: String = classOf[BuildAsyncProfilerSparkPlugin].getName
   val flagFileDir: String = System.getProperty("java.io.tmpdir") + "default/jobStepId/"
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala
similarity index 96%
rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala
rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala
index 1ecbbe6a23..d0e21562cf 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryAsyncProfilerSparkPluginTest.scala
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
 
 import org.apache.kylin.common.asyncprofiler.AsyncProfilerExecutorPlugin
 import org.junit.Assert
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryProfilerDriverPluginTest.scala
similarity index 92%
rename from src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala
rename to src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryProfilerDriverPluginTest.scala
index 44c564b3ba..74db3cc3a1 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/asyncprofiler/QueryProfilerDriverPluginTest.scala
@@ -16,13 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.query.plugin.asyncprofiler
 
+import org.apache.kylin.query.plugin.SparkPluginWithMeta
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.{SparkConf, SparkContext}
 import org.junit.Assert
 
-class QueryAsyncProfilerDriverPluginTest extends AsyncPluginWithMeta {
+class QueryAsyncProfilerDriverPluginTest extends SparkPluginWithMeta {
 
   val sparkPluginName: String = classOf[QueryAsyncProfilerSparkPlugin].getName
 
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPluginTest.scala
new file mode 100644
index 0000000000..c725c3f83b
--- /dev/null
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnoseExecutorPluginTest.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.plugin.diagnose
+
+import com.codahale.metrics.MetricRegistry
+import org.apache.kylin.query.plugin.SparkPluginWithMeta
+import org.apache.spark.api.plugin.PluginContext
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.Assert
+
+import java.nio.file.Files
+import java.util
+
+class DiagnoseExecutorPluginTest extends SparkPluginWithMeta {
+
+  val sparkPluginName: String = classOf[DiagnoseSparkPlugin].getName
+  val executorPluginTest = new DiagnoseExecutorPlugin()
+  val mockPluginCtx = new MockPluginCtx
+  val tempContainerDir = Files.createTempDirectory("PluginContainerTest")
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val conf = new SparkConf()
+      .setAppName(getClass.getName)
+      .set(SparkLauncher.SPARK_MASTER, "local[1]")
+      .set("spark.plugins", sparkPluginName)
+    sc = new SparkContext(conf)
+
+    mockPluginCtx.sparkConf = sc.getConf
+    mockPluginCtx.hdfsDir = tempContainerDir.toString
+    mockPluginCtx.mockId = "mockId"
+
+    executorPluginTest.setCtx(mockPluginCtx)
+    executorPluginTest.setContainerDir(tempContainerDir.toFile)
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    mockPluginCtx.clear()
+  }
+
+
+  test("Test executor plugin") {
+    val filePath = Files.createTempFile(tempContainerDir, "gc", "log")
+    Assert.assertTrue(filePath.toFile.exists())
+
+    mockPluginCtx.message = DiagnoseConstant.COLLECT
+    executorPluginTest.checkAndDiagnose()
+
+    mockPluginCtx.message = DiagnoseConstant.NOP
+    executorPluginTest.checkAndDiagnose()
+
+    mockPluginCtx.message = DiagnoseConstant.EMPTY
+    executorPluginTest.checkAndDiagnose()
+  }
+}
+
+class MockPluginCtx() extends PluginContext {
+  var message: String = _
+  var mockId: String = _
+  var hdfsDir: String = _
+  var sparkConf: SparkConf = _
+
+  override def ask(input: Any): String = {
+    if (DiagnoseConstant.HDFSDIR.equals(input)) {
+      hdfsDir
+    } else {
+      message
+    }
+  }
+
+  override def executorID(): String = mockId
+
+  override def conf(): SparkConf = sparkConf
+
+  override def metricRegistry(): MetricRegistry = null
+
+  override def hostname(): String = "MockHostname"
+
+  override def resources(): util.Map[String, ResourceInformation] = null
+
+  override def send(message: Any): Unit = {}
+
+  def clear(): Unit = {
+    message = null
+    mockId = null
+    hdfsDir = null
+    sparkConf = null
+  }
+
+}
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnosePluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnosePluginTest.scala
new file mode 100644
index 0000000000..75df31685e
--- /dev/null
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/plugin/diagnose/DiagnosePluginTest.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query.plugin.diagnose
+
+import org.apache.kylin.common.KylinConfig
+import org.apache.kylin.query.plugin.SparkPluginWithMeta
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.Assert
+
+class DiagnosePluginTest extends SparkPluginWithMeta {
+
+  val sparkPluginName: String = classOf[DiagnoseSparkPlugin].getName
+  val diagPlugin = new DiagnoseSparkPlugin
+  val driverPluginTest = new DiagnoseDriverPlugin()
+  val executorPluginTest = new DiagnoseExecutorPlugin()
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    val conf = new SparkConf()
+      .setAppName(getClass.getName)
+      .set(SparkLauncher.SPARK_MASTER, "local[1]")
+      .set("spark.plugins", sparkPluginName)
+    sc = new SparkContext(conf)
+  }
+
+
+  test("Test trigger gc collection") {
+    // get gc success
+    DiagnoseHelper.collectSparderExecutorGc()
+    Assert.assertEquals(DiagnoseConstant.STATE_WAIT, DiagnoseHelper.state)
+
+    // get gc failed
+    DiagnoseHelper.resultCollectionTimeout = 100L
+    DiagnoseHelper.activeExecutorCount = 1
+    DiagnoseHelper.collectSparderExecutorGc()
+    Assert.assertEquals(DiagnoseConstant.STATE_WAIT, DiagnoseHelper.state)
+  }
+
+
+  test("Test driver plugin") {
+    // NEXTCMD
+    DiagnoseHelper.setState(DiagnoseConstant.STATE_WAIT)
+    var reply = driverPluginTest.receive(DiagnoseConstant.NEXTCMD)
+    Assert.assertEquals(DiagnoseConstant.NOP, reply)
+
+    DiagnoseHelper.setState(DiagnoseConstant.STATE_COLLECT)
+    reply = driverPluginTest.receive(DiagnoseConstant.NEXTCMD)
+    Assert.assertEquals(DiagnoseConstant.COLLECT, reply)
+
+    // SENDRESULT
+    DiagnoseHelper.initGcCount(2)
+    reply = driverPluginTest.receive(DiagnoseConstant.SENDRESULT)
+    Assert.assertEquals(DiagnoseConstant.EMPTY, reply)
+    Assert.assertEquals(1, DiagnoseHelper.gcResult.getCount)
+
+    // HDFSDIR
+    reply = driverPluginTest.receive(DiagnoseConstant.HDFSDIR)
+    Assert.assertEquals(KylinConfig.getInstanceFromEnv.getHdfsWorkingDirectory, reply)
+
+    // Other
+    reply = driverPluginTest.receive("Other")
+    Assert.assertEquals(DiagnoseConstant.EMPTY, reply)
+  }
+
+}