You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/12 09:28:11 UTC

[kylin] 15/17: KYLIN-5398 Flame graph availability fix

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d3ecc5a0847da26178e84cf13d55c0378764452e
Author: Guoliang Sun <gu...@kyligence.io>
AuthorDate: Thu Nov 10 15:13:06 2022 +0800

    KYLIN-5398 Flame graph availability fix
---
 .../libasyncProfiler-linux-arm64.so                | Bin 0 -> 298256 bytes
 .../libasyncProfiler-linux-x64.so                  | Bin 0 -> 303903 bytes
 build/async-profiler-lib/libasyncProfiler-mac.so   | Bin 0 -> 634272 bytes
 build/release/compress.sh                          |   5 ++
 .../java/org/apache/kylin/common/KapConfig.java    |  13 +++-
 .../org/apache/kylin/common/KylinConfigBase.java   |  11 ++-
 .../kylin/common/asyncprofiler/AsyncArchUtil.java  |  77 +++++++++++++++++++++
 .../kylin/common/asyncprofiler/AsyncProfiler.java  |  69 +++++++++++++-----
 .../common/asyncprofiler/AsyncProfilerUtils.java   |   4 ++
 .../libasyncProfiler-linux-arm64.so                | Bin 0 -> 298256 bytes
 .../libasyncProfiler-linux-x64.so                  | Bin 0 -> 303903 bytes
 .../async-profiler-lib/libasyncProfiler-mac.so     | Bin 0 -> 634272 bytes
 .../async-profiler-lib/linux64/libasyncProfiler.so | Bin 314098 -> 0 bytes
 .../async-profiler-lib/macOS/libasyncProfiler.so   | Bin 239460 -> 0 bytes
 .../AsyncProfilerExecutorPlugin.scala              |   8 ++-
 .../common/asyncprofiler/AsyncProfilerTool.scala   |   8 ++-
 .../apache/kylin/common/KylinConfigBaseTest.java   |   1 +
 .../common/asyncprofiler/AsyncArchUtilTest.java}   |  34 ++++-----
 .../common/asyncprofiler/AsyncProfilerTest.java    |  17 +++--
 .../asyncprofiler/AsyncProfilerToolTest.java       |  24 +++++++
 .../asyncprofiler/AsyncProfilerUtilsTest.java      |  16 +++++
 .../org/apache/kylin/rest/service/JobService.java  |   3 +-
 .../localmeta/lib/libasyncProfiler-linux-arm64.so  | Bin 0 -> 298256 bytes
 .../localmeta/lib/libasyncProfiler-linux-x64.so    | Bin 0 -> 303903 bytes
 .../localmeta/lib/libasyncProfiler-mac.so          | Bin 0 -> 634272 bytes
 .../kylin/engine/spark/job/NSparkExecutable.java   |  55 +++++++++------
 .../kylin/query/asyncprofiler/AsyncProfiling.scala |   9 ++-
 .../QueryAsyncProfilerDriverPlugin.scala           |   7 +-
 .../query/asyncprofiler/AsyncPluginWithMeta.scala  |   8 +--
 .../query/asyncprofiler/AsyncProfilingTest.scala   |  45 +++++-------
 .../QueryAsyncProfilerDriverPluginTest.scala       |  27 ++------
 .../BuildAsyncProfilerDriverPlugin.scala           |   9 ++-
 32 files changed, 318 insertions(+), 132 deletions(-)

diff --git a/build/async-profiler-lib/libasyncProfiler-linux-arm64.so b/build/async-profiler-lib/libasyncProfiler-linux-arm64.so
new file mode 100644
index 0000000000..b959823506
Binary files /dev/null and b/build/async-profiler-lib/libasyncProfiler-linux-arm64.so differ
diff --git a/build/async-profiler-lib/libasyncProfiler-linux-x64.so b/build/async-profiler-lib/libasyncProfiler-linux-x64.so
new file mode 100644
index 0000000000..6d961cec0d
Binary files /dev/null and b/build/async-profiler-lib/libasyncProfiler-linux-x64.so differ
diff --git a/build/async-profiler-lib/libasyncProfiler-mac.so b/build/async-profiler-lib/libasyncProfiler-mac.so
new file mode 100644
index 0000000000..ad45237d5f
Binary files /dev/null and b/build/async-profiler-lib/libasyncProfiler-mac.so differ
diff --git a/build/release/compress.sh b/build/release/compress.sh
index 61c7f9eb9d..bf9d584b92 100755
--- a/build/release/compress.sh
+++ b/build/release/compress.sh
@@ -43,6 +43,11 @@ if [[ -d "influxdb" ]]; then
     cp -rf postgresql ${package_name}/
 fi
 
+# copy async profiler native files
+cp -rf async-profiler-lib/libasyncProfiler-mac.so "${package_name}"/lib/libasyncProfiler-mac.so
+cp -rf async-profiler-lib/libasyncProfiler-linux-x64.so "${package_name}"/lib/libasyncProfiler-linux-x64.so
+cp -rf async-profiler-lib/libasyncProfiler-linux-arm64.so "${package_name}"/lib/libasyncProfiler-linux-arm64.so
+
 # Add ssb data preparation files
 mkdir -p ${package_name}/tool/ssb
 cp -rf ../src/examples/sample_cube/data ${package_name}/tool/ssb/
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
index 6cb7cbe9b2..06123ccf6c 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
@@ -22,12 +22,14 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Map;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.annotation.Clarification;
 import org.apache.kylin.common.util.EncryptUtil;
 import org.apache.kylin.common.util.FileUtils;
 
+@Slf4j
 @Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
 public class KapConfig {
 
@@ -248,7 +250,7 @@ public class KapConfig {
     }
 
     /**
-     *  Smart modeling
+     * Smart modeling
      */
     public String getSmartModelingConf(String conf) {
         return config.getOptional("kylin.smart.conf." + conf, null);
@@ -515,16 +517,21 @@ public class KapConfig {
 
     public String sparderFiles() {
         try {
-            File storageFile = new File(getKylinConfig().getLogSparkExecutorPropertiesFile());
+            File storageFile = new File(config.getLogSparkExecutorPropertiesFile());
             String additionalFiles = storageFile.getCanonicalPath();
-            storageFile = new File(getKylinConfig().getLogSparkAppMasterPropertiesFile());
+            storageFile = new File(config.getLogSparkAppMasterPropertiesFile());
             if (additionalFiles.isEmpty()) {
                 additionalFiles = storageFile.getCanonicalPath();
             } else {
                 additionalFiles = additionalFiles + "," + storageFile.getCanonicalPath();
             }
+            if (config.asyncProfilingEnabled()) {
+                additionalFiles = additionalFiles + "," + config.getAsyncProfilerFiles();
+            }
+            log.info("Sparder additionalFiles: {}", additionalFiles);
             return config.getOptional("kylin.query.engine.sparder-additional-files", additionalFiles);
         } catch (IOException e) {
+            log.error("Add sparderFiles failed, " + e);
             return "";
         }
     }
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0802fc242e..557901c837 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -19,6 +19,8 @@
 package org.apache.kylin.common;
 
 import static java.lang.Math.toIntExact;
+import static org.apache.kylin.common.asyncprofiler.AsyncProfiler.ASYNC_PROFILER_LIB_LINUX_ARM64;
+import static org.apache.kylin.common.asyncprofiler.AsyncProfiler.ASYNC_PROFILER_LIB_LINUX_X64;
 import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_CONNECTION_URL_KEY;
 import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_DRIVER_KEY;
 import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_PASS_KEY;
@@ -2636,6 +2638,13 @@ public abstract class KylinConfigBase implements Serializable {
         return getLogPropertyFile("spark-appmaster-log4j.xml");
     }
 
+    public String getAsyncProfilerFiles() throws IOException {
+        String kylinHome = getKylinHomeWithoutWarn();
+        File libX64 = new File(kylinHome + "/lib/" + ASYNC_PROFILER_LIB_LINUX_X64);
+        File libArm64 = new File(kylinHome + "/lib/" + ASYNC_PROFILER_LIB_LINUX_ARM64);
+        return libX64.getCanonicalPath() + "," + libArm64.getCanonicalPath();
+    }
+
     private String getLogPropertyFile(String filename) {
         String parentFolder;
         if (isDevEnv()) {
@@ -3597,7 +3606,7 @@ public abstract class KylinConfigBase implements Serializable {
 
     public boolean buildJobProfilingEnabled() {
         return !Boolean.parseBoolean(System.getProperty("spark.local", FALSE))
-                && Boolean.parseBoolean(getOptional("kylin.engine.async-profiler-enabled", FALSE));
+                && Boolean.parseBoolean(getOptional("kylin.engine.async-profiler-enabled", TRUE));
     }
 
     public long buildJobProfilingResultTimeout() {
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtil.java b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtil.java
new file mode 100644
index 0000000000..a1b77ab622
--- /dev/null
+++ b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.asyncprofiler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+public class AsyncArchUtil {
+
+    public enum ArchType {
+        LINUX_X64(), LINUX_ARM64()
+    }
+
+    private static final Map<String, ArchType> ARCH_TO_PROCESSOR = new HashMap<>();
+
+    static {
+        addProcessors(ArchType.LINUX_X64, "x86_64", "amd64");
+        addProcessors(ArchType.LINUX_ARM64, "aarch64");
+    }
+
+    public static ArchType getProcessor() {
+        return getProcessor(getSystemProperty("os.arch"));
+    }
+
+    /**
+     * Returns a {@link ArchType} object with the given value {@link String}. The {@link String} must be
+     * like a value returned by the os.arch System Property.
+     *
+     * @param osArch A {@link String} like a value returned by the os.arch System Property.
+     * @return A {@link ArchType} when it exists, else {@code null}.
+     */
+    public static ArchType getProcessor(final String osArch) {
+        return ARCH_TO_PROCESSOR.get(osArch);
+    }
+
+    /**
+     * <p>
+     * Gets a System property, defaulting to {@code null} if the property cannot be read.
+     * </p>
+     * <p>
+     * If a {@code SecurityException} is caught, the return value is {@code null} and a message is written to
+     * {@code System.err}.
+     * </p>
+     *
+     * @param property the system property name
+     * @return the system property value or {@code null} if a security problem occurs
+     */
+    private static String getSystemProperty(final String property) {
+        try {
+            return System.getProperty(property);
+        } catch (final SecurityException ex) {
+            // we are not allowed to look at this property, the SystemUtils property value will default to null.
+            return null;
+        }
+    }
+
+    private static void addProcessors(ArchType archType, final String... keys) {
+        Stream.of(keys).forEach(key -> ARCH_TO_PROCESSOR.put(key, archType));
+    }
+}
\ No newline at end of file
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java
index c49cdffe20..a0ba5f13a1 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java
@@ -18,27 +18,31 @@
 
 package org.apache.kylin.common.asyncprofiler;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Objects;
 
+import org.apache.kylin.common.KylinConfigBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class AsyncProfiler {
 
     private static final Logger logger = LoggerFactory.getLogger(AsyncProfiler.class);
 
-    private static final String LIB_FILE = "libasyncProfiler.so";
-    private static final String LIB_PARENT = "/async-profiler-lib";
-    private static final String MAC_LIB_PATH = LIB_PARENT + "/macOS/" + LIB_FILE;
-    private static final String LINUX_64_LIB_PATH = LIB_PARENT + "/linux64/" + LIB_FILE;
+    // async profiler native files
+    public static final String ASYNC_PROFILER_LIB_MAC = "libasyncProfiler-mac.so";
+    public static final String ASYNC_PROFILER_LIB_LINUX_X64 = "libasyncProfiler-linux-x64.so";
+    public static final String ASYNC_PROFILER_LIB_LINUX_ARM64 = "libasyncProfiler-linux-arm64.so";
+    private static final String LIB_PARENT = "/async-profiler-lib/";
 
     private static AsyncProfiler profiler;
     private boolean loaded = false;
 
-    public static synchronized AsyncProfiler getInstance() {
+    public static synchronized AsyncProfiler getInstance(boolean loadLocalLib) {
         if (profiler == null) {
-            profiler = new AsyncProfiler();
+            profiler = new AsyncProfiler(loadLocalLib);
         }
         return profiler;
     }
@@ -55,14 +59,43 @@ public class AsyncProfiler {
         logger.info("Test arg for ut: {}", ignore);
     }
 
-    private AsyncProfiler() {
+    private AsyncProfiler(boolean loadLocalLib) {
         try {
             boolean isTestingOnLocalMac = System.getProperty("os.name", "").contains("Mac")
                     || System.getProperty("os.name", "").contains("OS X");
             if (isTestingOnLocalMac) {
-                loadLibAsyncProfilerSO(MAC_LIB_PATH);
+                loadLibAsyncProfilerSO(LIB_PARENT + ASYNC_PROFILER_LIB_MAC);
             } else {
-                loadLibAsyncProfilerSO(LINUX_64_LIB_PATH);
+                String libName;
+                File libPath;
+
+                // Select native lib loading based on machine architecture
+                AsyncArchUtil.ArchType archType = AsyncArchUtil.getProcessor();
+                logger.info("Machine's archType: {}", archType);
+                switch (archType) {
+                case LINUX_ARM64:
+                    libName = ASYNC_PROFILER_LIB_LINUX_ARM64;
+                    break;
+                case LINUX_X64:
+                default:
+                    libName = ASYNC_PROFILER_LIB_LINUX_X64;
+                    break;
+                }
+
+                // Adapting load paths based on Spark deployment patterns
+                if (loadLocalLib) {
+                    libPath = new File(KylinConfigBase.getKylinHome() + "/lib/" + libName);
+                } else {
+                    libPath = new File(libName);
+                }
+                logger.info("AsyncProfiler libPath: {}, exists: {}", libPath.getAbsolutePath(),
+                        Files.exists(libPath.toPath()));
+                // check this for ut
+                if (libPath.exists()) {
+                    System.load(libPath.getAbsolutePath());
+                } else {
+                    loadLibAsyncProfilerSO(LIB_PARENT + libName);
+                }
             }
             loaded = true;
         } catch (Exception e) {
@@ -71,12 +104,12 @@ public class AsyncProfiler {
     }
 
     private void loadLibAsyncProfilerSO(String libPath) throws IOException {
-        final java.nio.file.Path tmpLib = java.io.File.createTempFile("libasyncProfiler", ".so").toPath();
-        java.nio.file.Files.copy(
-                Objects.requireNonNull(AsyncProfilerTool.class.getResourceAsStream(libPath)),
-                tmpLib,
-                java.nio.file.StandardCopyOption.REPLACE_EXISTING);
-        System.load(tmpLib.toAbsolutePath().toString());
+        File asyncProfilerLib = File.createTempFile("libasyncProfiler", ".so");
+        java.nio.file.Files.copy(Objects.requireNonNull(AsyncProfilerTool.class.getResourceAsStream(libPath)),
+                asyncProfilerLib.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING);
+        logger.info("AsyncProfiler will try to load from libPath: {}, exists: {}", asyncProfilerLib.getAbsolutePath(),
+                asyncProfilerLib.exists());
+        System.load(asyncProfilerLib.getAbsolutePath());
     }
 
     public boolean isLoaded() {
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java
index e86648268a..cd74a7cd33 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java
@@ -53,6 +53,10 @@ public class AsyncProfilerUtils {
         this.cachedResult = countDownLatch;
     }
 
+    public void build(File localCacheDir) {
+        this.localCacheDir = localCacheDir;
+    }
+
     public void build(long resultCollectionTimeout, File localCacheDir) {
         this.resultCollectionTimeout = resultCollectionTimeout;
         this.localCacheDir = localCacheDir;
diff --git a/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-arm64.so b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-arm64.so
new file mode 100644
index 0000000000..b959823506
Binary files /dev/null and b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-arm64.so differ
diff --git a/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-x64.so b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-x64.so
new file mode 100644
index 0000000000..6d961cec0d
Binary files /dev/null and b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-x64.so differ
diff --git a/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-mac.so b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-mac.so
new file mode 100644
index 0000000000..ad45237d5f
Binary files /dev/null and b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-mac.so differ
diff --git a/src/core-common/src/main/resources/async-profiler-lib/linux64/libasyncProfiler.so b/src/core-common/src/main/resources/async-profiler-lib/linux64/libasyncProfiler.so
deleted file mode 100755
index 4153f52868..0000000000
Binary files a/src/core-common/src/main/resources/async-profiler-lib/linux64/libasyncProfiler.so and /dev/null differ
diff --git a/src/core-common/src/main/resources/async-profiler-lib/macOS/libasyncProfiler.so b/src/core-common/src/main/resources/async-profiler-lib/macOS/libasyncProfiler.so
deleted file mode 100755
index c7298c6b82..0000000000
Binary files a/src/core-common/src/main/resources/async-profiler-lib/macOS/libasyncProfiler.so and /dev/null differ
diff --git a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala
index fe4a3918a2..e423c98f81 100644
--- a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala
+++ b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala
@@ -20,6 +20,7 @@ package org.apache.kylin.common.asyncprofiler
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.kylin.common.asyncprofiler.Message._
+import org.apache.kylin.common.util.ExecutorServiceUtil
 import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext}
 import org.apache.spark.internal.Logging
 
@@ -40,7 +41,8 @@ class AsyncProfilerExecutorPlugin extends ExecutorPlugin with Logging {
     val profile = new Runnable {
       override def run(): Unit = checkAndProfile()
     }
-    log.debug(s"AsyncProfiler status: ${AsyncProfilerTool.status()}")
+    AsyncProfilerTool.loadAsyncProfilerLib(false)
+    log.info(s"AsyncProfiler status: ${AsyncProfilerTool.status()}")
     scheduledExecutorService.scheduleWithFixedDelay(
       profile, 0, checkingInterval, TimeUnit.MILLISECONDS)
   }
@@ -80,4 +82,8 @@ class AsyncProfilerExecutorPlugin extends ExecutorPlugin with Logging {
     ctx.send(msg)
   }
 
+  override def shutdown(): Unit = {
+    ExecutorServiceUtil.shutdownGracefully(scheduledExecutorService, 3)
+    super.shutdown()
+  }
 }
diff --git a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala
index 0deb703da9..69bb7c72bb 100644
--- a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala
+++ b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala
@@ -27,7 +27,13 @@ object AsyncProfilerTool {
 
   val log: Logger = LoggerFactory.getLogger(AsyncProfilerTool.getClass)
 
-  private val profiler = AsyncProfiler.getInstance()
+  private var profiler: AsyncProfiler = _
+
+  def loadAsyncProfilerLib(loadLocalLib: Boolean): Unit = {
+    // Local  - load Sparder Driver or (Spark Driver which in client mode)
+    // Remote - load all Executors or (Spark Engine which in cluster mode)
+    profiler = AsyncProfiler.getInstance(loadLocalLib)
+  }
 
   private var _running = false
 
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index f2c3cb1a2f..f60a2b003b 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -1191,6 +1191,7 @@ class KylinConfigBaseTest {
     @Test
     void testBuildJobProfilingEnabled() {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setProperty("kylin.engine.async-profiler-enabled", "false");
         assertFalse(config.buildJobProfilingEnabled());
         config.setProperty("kylin.engine.async-profiler-enabled", "true");
         assertTrue(config.buildJobProfilingEnabled());
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtilTest.java
similarity index 50%
copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
copy to src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtilTest.java
index 68f344186b..0ba16c4a75 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
+++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtilTest.java
@@ -16,29 +16,25 @@
  * limitations under the License.
  */
 
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.common.asyncprofiler;
 
-import org.apache.spark.SparkContext
-import org.apache.spark.api.plugin.{DriverPlugin, PluginContext}
-import org.apache.spark.internal.Logging
+import org.junit.Assert;
+import org.junit.Test;
 
-import java.util
+public class AsyncArchUtilTest {
 
-class QueryAsyncProfilerDriverPlugin extends DriverPlugin with Logging {
-
-  override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = super.init(sc, pluginContext)
+    @Test
+    public void testGetProcessor() {
+        AsyncArchUtil.ArchType archType = AsyncArchUtil.getProcessor();
+        Assert.assertNotNull(archType);
+    }
 
-  override def receive(message: Any): AnyRef = {
-    import org.apache.kylin.common.asyncprofiler.Message._
+    @Test
+    public void testArchType() {
+        AsyncArchUtil.ArchType archType = AsyncArchUtil.getProcessor("x86_64");
+        Assert.assertEquals(AsyncArchUtil.ArchType.LINUX_X64, archType);
 
-    val (command, executorId, param) = processMessage(message.toString)
-    command match {
-      case NEXT_COMMAND =>
-        AsyncProfiling.nextCommand()
-      case RESULT =>
-        AsyncProfiling.cacheExecutorResult(param, executorId)
-        ""
-      case _ => ""
+        archType = AsyncArchUtil.getProcessor("aarch64");
+        Assert.assertEquals(AsyncArchUtil.ArchType.LINUX_ARM64, archType);
     }
-  }
 }
\ No newline at end of file
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java
index 82ab3f7a35..176d9ad119 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java
@@ -26,8 +26,13 @@ import org.junit.Test;
 public class AsyncProfilerTest {
 
     @Test
-    public void testLoaded() {
-        Assert.assertTrue(AsyncProfiler.getInstance().isLoaded());
+    public void testLocalLoaded() {
+        Assert.assertTrue(AsyncProfiler.getInstance(true).isLoaded());
+    }
+
+    @Test
+    public void testRemoteLoaded() {
+        Assert.assertTrue(AsyncProfiler.getInstance(false).isLoaded());
     }
 
     // This may success in local Mac, but failed in CI
@@ -36,7 +41,7 @@ public class AsyncProfilerTest {
         System.setProperty("os.name", "Mac");
         String errorMsg = "";
         try {
-            AsyncProfiler.getInstance();
+            AsyncProfiler.getInstance(true);
         } catch (Throwable throwable) {
             errorMsg = throwable.getMessage();
         }
@@ -45,7 +50,7 @@ public class AsyncProfilerTest {
 
     @Test
     public void testExecute() throws IOException {
-        AsyncProfiler asyncProfiler = AsyncProfiler.getInstance();
+        AsyncProfiler asyncProfiler = AsyncProfiler.getInstance(true);
         try {
             asyncProfiler.execute("start,event=cpu");
             asyncProfiler.stop();
@@ -59,12 +64,12 @@ public class AsyncProfilerTest {
     @Test
     public void testStop() {
         Assert.assertThrows("Profiler is not active", IllegalStateException.class,
-                AsyncProfiler.getInstance()::stop);
+                AsyncProfiler.getInstance(true)::stop);
     }
 
     @Test
     public void testAsyncProfilerUtInstance() {
-        AsyncProfiler originInstance = AsyncProfiler.getInstance();
+        AsyncProfiler originInstance = AsyncProfiler.getInstance(true);
         AsyncProfiler utInstance = AsyncProfiler.utInstance();
         Assert.assertSame(originInstance, utInstance);
     }
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java
index 85b73a80bc..2cdb832339 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java
@@ -40,6 +40,30 @@ public class AsyncProfilerToolTest extends NLocalFileMetadataTestCase {
         this.cleanupTestMetadata();
     }
 
+    @Test
+    public void testLoadLocalAsyncProfilerLib() {
+        AsyncProfilerTool.loadAsyncProfilerLib(true);
+        String errorMsg = "";
+        try {
+            AsyncProfilerTool.status();
+        } catch (Exception e) {
+            errorMsg = e.getMessage();
+        }
+        Assert.assertTrue(errorMsg.isEmpty());
+    }
+
+    @Test
+    public void testLoadRemoteAsyncProfilerLib() {
+        AsyncProfilerTool.loadAsyncProfilerLib(false);
+        String errorMsg = "";
+        try {
+            AsyncProfilerTool.status();
+        } catch (Exception e) {
+            errorMsg = e.getMessage();
+        }
+        Assert.assertTrue(errorMsg.isEmpty());
+    }
+
     @Test
     public void testStartAndStop() {
         try {
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java
index 7eb636839a..fba1ad54a6 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java
@@ -57,6 +57,22 @@ public class AsyncProfilerUtilsTest {
         Assert.assertEquals(testFile, asyncProfilerUtilsBuild.localCacheDir);
     }
 
+    @Test
+    public void testBuildWithNewLocalCacheDir() throws IOException {
+        AsyncProfilerUtils asyncProfilerUtilsBuild = AsyncProfilerUtils.getInstance();
+        asyncProfilerUtilsBuild.build(new CountDownLatch(2));
+        Assert.assertEquals(2, asyncProfilerUtilsBuild.cachedResult.getCount());
+
+        File testFile = Files.createTempDirectory("ke-build-async-test-profiler-").toFile();
+        asyncProfilerUtilsBuild.build(2L, testFile);
+        Assert.assertEquals(2L, asyncProfilerUtilsBuild.resultCollectionTimeout);
+        Assert.assertEquals(testFile, asyncProfilerUtilsBuild.localCacheDir);
+
+        testFile = Files.createTempDirectory("ke-build-async-test-profiler-").toFile();
+        asyncProfilerUtilsBuild.build(testFile);
+        Assert.assertEquals(testFile, asyncProfilerUtilsBuild.localCacheDir);
+    }
+
     @Test
     public void testWaitForResultTimeout() throws IOException, InterruptedException {
         AsyncProfilerUtils asyncProfilerUtils = AsyncProfilerUtils.getInstance();
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index 22424681c5..494ac3b50e 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -47,7 +47,6 @@ import java.util.stream.Stream;
 
 import javax.servlet.http.HttpServletRequest;
 
-import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.io.IOUtils;
@@ -98,6 +97,7 @@ import org.apache.kylin.job.execution.StageBase;
 import org.apache.kylin.metadata.cube.model.NBatchConstants;
 import org.apache.kylin.metadata.cube.model.NDataSegment;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.model.FusionModel;
 import org.apache.kylin.metadata.model.FusionModelManager;
 import org.apache.kylin.metadata.model.NDataModel;
@@ -1340,6 +1340,7 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl
     }
 
     public void setResponseLanguage(HttpServletRequest request) {
+        aclEvaluate.checkIsGlobalAdmin();
         String languageToHandle = request.getHeader(HttpHeaders.ACCEPT_LANGUAGE);
         if (languageToHandle == null) {
             ErrorCode.setMsg("cn");
diff --git a/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-arm64.so b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-arm64.so
new file mode 100644
index 0000000000..b959823506
Binary files /dev/null and b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-arm64.so differ
diff --git a/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-x64.so b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-x64.so
new file mode 100644
index 0000000000..6d961cec0d
Binary files /dev/null and b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-x64.so differ
diff --git a/src/examples/test_case_data/localmeta/lib/libasyncProfiler-mac.so b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-mac.so
new file mode 100644
index 0000000000..ad45237d5f
Binary files /dev/null and b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-mac.so differ
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index a842c926ca..d31d02f83e 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -18,13 +18,23 @@
 
 package org.apache.kylin.engine.spark.job;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin;
-import lombok.val;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
@@ -60,26 +70,18 @@ import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
 import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin;
 import org.apache.parquet.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.val;
 
 /**
  *
@@ -883,6 +885,13 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage
         filePaths.add(kylinConf.getLogSparkAppMasterPropertiesFile());
         filePaths.add(kylinConf.getLogSparkDriverPropertiesFile());
         filePaths.add(kylinConf.getLogSparkExecutorPropertiesFile());
+        if (kylinConf.buildJobProfilingEnabled()) {
+            try {
+                filePaths.add(kylinConf.getAsyncProfilerFiles());
+            } catch (IOException e) {
+                logger.error("Add SparkPluginFile failed.", e);
+            }
+        }
         filePaths.add(sparkConf.get(SPARK_FILES_1));
         filePaths.add(sparkConf.get(SPARK_FILES_2));
 
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala
index e1dc51869c..d27cfc76f5 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala
@@ -25,14 +25,14 @@ import org.apache.kylin.common.exception.{KylinException, QueryErrorCode}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparderEnv
 
-import java.io.OutputStream
+import java.io.{File, OutputStream}
 import java.nio.file.Files
 import java.util.concurrent.CountDownLatch
 
 
 object AsyncProfiling extends Logging {
 
-  private val localCacheDir = Files.createTempDirectory("ke-async-profiler-result-").toFile
+  var localCacheDir: File = Files.createTempDirectory("ke-async-profiler-result-").toFile
   localCacheDir.deleteOnExit()
   private val resultCollectionTimeout = KylinConfig.getInstanceFromEnv.asyncProfilingResultTimeout
   private val profilingTimeout = KylinConfig.getInstanceFromEnv.asyncProfilingProfileTimeout
@@ -57,6 +57,11 @@ object AsyncProfiling extends Logging {
         throw new KylinException(QueryErrorCode.PROFILING_ALREADY_STARTED, "profiling is already started, stop it first")
       }
       logDebug("profiler start")
+      // Linux may periodically clean up the files in the /tmp directory
+      if (!localCacheDir.exists()) {
+        localCacheDir = Files.createTempDirectory("ke-async-profiler-result-").toFile
+        asyncProfilerUtils.build(localCacheDir)
+      }
       asyncProfilerUtils.cleanLocalCache()
       // expecting driver + count(executor) amount of results
       cachedResult = new CountDownLatch(
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
index 68f344186b..8af9631561 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.query.asyncprofiler
 
+import org.apache.kylin.common.asyncprofiler.AsyncProfilerTool
 import org.apache.spark.SparkContext
 import org.apache.spark.api.plugin.{DriverPlugin, PluginContext}
 import org.apache.spark.internal.Logging
@@ -26,7 +27,11 @@ import java.util
 
 class QueryAsyncProfilerDriverPlugin extends DriverPlugin with Logging {
 
-  override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = super.init(sc, pluginContext)
+  override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = {
+    // Sparder Driver and KE are always in one JVM, in client mode
+    AsyncProfilerTool.loadAsyncProfilerLib(true)
+    super.init(sc, pluginContext)
+  }
 
   override def receive(message: Any): AnyRef = {
     import org.apache.kylin.common.asyncprofiler.Message._
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala
index d94a560dd8..9db5d72000 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala
@@ -19,7 +19,6 @@
 package org.apache.kylin.query.asyncprofiler
 
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase
-import org.apache.spark.sql.{SparderEnv, SparkSession}
 import org.apache.spark.{SparkContext, SparkFunSuite}
 import org.scalatest.BeforeAndAfterAll
 
@@ -44,11 +43,10 @@ trait AsyncPluginWithMeta extends SparkFunSuite with BeforeAndAfterAll {
   }
 
   protected def clearSparkSession(): Unit = {
-    if (SparderEnv.isSparkAvailable) {
-      SparderEnv.getSparkSession.stop()
+    if (sc != null) {
+      sc.stop()
+      sc = null
     }
-    SparkSession.setActiveSession(null)
-    SparkSession.setDefaultSession(null)
   }
 
   override def beforeAll(): Unit = {
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala
index c05923664d..21c08c4ee7 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala
@@ -24,7 +24,7 @@ import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.{SparkConf, SparkContext}
 import org.mockito.Mockito.mock
 
-import java.io.OutputStream
+import java.io.{File, OutputStream}
 
 class AsyncProfilingTest extends AsyncPluginWithMeta {
 
@@ -34,55 +34,46 @@ class AsyncProfilingTest extends AsyncPluginWithMeta {
   val statusFileName: String = flagFileDir + "/status"
   val dumpFileName: String = flagFileDir + "/dump.tar.gz"
 
-  test("init AsyncProfiling") {
-    AsyncProfiling.asyncProfilerUtils
-  }
-
-  test("start and dump AsyncProfiling") {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
     val conf = new SparkConf()
       .setAppName(getClass.getName)
       .set(SparkLauncher.SPARK_MASTER, "local[1]")
       .set("spark.plugins", sparkPluginName)
 
     sc = new SparkContext(conf)
+  }
+
+  test("init AsyncProfiling") {
+    AsyncProfiling.asyncProfilerUtils
+  }
+
+  test("start and dump AsyncProfiling") {
     AsyncProfiling.start("")
     AsyncProfiling.dump("")
+  }
 
-    sc.stop()
-    sc = null
+  test("start with localCacheDir by delete") {
+    AsyncProfiling.nextCommand()
+    val localCacheDir = AsyncProfiling.localCacheDir
+    new File(localCacheDir.getAbsolutePath).delete()
+    AsyncProfiling.start("")
+    AsyncProfiling.dump("")
   }
 
   test("waitForResult AsyncProfiling") {
     KylinConfig.getInstanceFromEnv.setProperty("kylin.query.async-profiler-result-timeout", "1ms")
 
-    val conf = new SparkConf()
-      .setAppName(getClass.getName)
-      .set(SparkLauncher.SPARK_MASTER, "local[1]")
-      .set("spark.plugins", sparkPluginName)
-
-
-    sc = new SparkContext(conf)
     AsyncProfiling.start("")
     AsyncProfiling.dump("")
     AsyncProfiling.waitForResult(mock(classOf[OutputStream]))
-
-    sc.stop()
-    sc = null
   }
 
   test("cacheExecutorResult AsyncProfiling") {
     KylinConfig.getInstanceFromEnv.setProperty("kylin.query.async-profiler-result-timeout", "1ms")
 
-    val conf = new SparkConf()
-      .setAppName(getClass.getName)
-      .set(SparkLauncher.SPARK_MASTER, "local[1]")
-      .set("spark.plugins", sparkPluginName)
-
-    sc = new SparkContext(conf)
     AsyncProfiling.start("")
     AsyncProfiling.cacheExecutorResult("content", "1")
-
-    sc.stop()
-    sc = null
+    AsyncProfiling.dump("")
   }
 }
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala
index 51843bfbbf..8d82c3c684 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala
@@ -26,53 +26,36 @@ class QueryAsyncProfilerDriverPluginTest extends AsyncPluginWithMeta {
 
   val sparkPluginName: String = classOf[QueryAsyncProfilerSparkPlugin].getName
 
-  test("plugin initialization") {
+  override def beforeAll(): Unit = {
+    super.beforeAll()
     val conf = new SparkConf()
       .setAppName(getClass.getName)
       .set(SparkLauncher.SPARK_MASTER, "local[1]")
       .set("spark.plugins", sparkPluginName)
 
     sc = new SparkContext(conf)
+  }
+
+  test("plugin initialization") {
     Assert.assertEquals(sparkPluginName, sc.getConf.get("spark.plugins"))
     new QueryAsyncProfilerDriverPlugin().receive("NEX-1:start,event=cpu")
-
-    sc.stop()
-    sc = null
   }
 
   test("plugin initialization receive result") {
-    val conf = new SparkConf()
-      .setAppName(getClass.getName)
-      .set(SparkLauncher.SPARK_MASTER, "local[1]")
-      .set("spark.plugins", sparkPluginName)
-
-    sc = new SparkContext(conf)
     Assert.assertEquals(sparkPluginName, sc.getConf.get("spark.plugins"))
     try {
       new QueryAsyncProfilerDriverPlugin().receive("RES-1:flamegraph")
     } catch {
       case _: Throwable =>
     }
-
-    sc.stop()
-    sc = null
   }
 
   test("plugin initialization receive others") {
-    val conf = new SparkConf()
-      .setAppName(getClass.getName)
-      .set(SparkLauncher.SPARK_MASTER, "local[1]")
-      .set("spark.plugins", sparkPluginName)
-
-    sc = new SparkContext(conf)
     Assert.assertEquals(sparkPluginName, sc.getConf.get("spark.plugins"))
     try {
       new QueryAsyncProfilerDriverPlugin().receive("OTH-1:start,event=cpu")
     } catch {
       case _: Throwable =>
     }
-
-    sc.stop()
-    sc = null
   }
 }
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala
index bbe900f49d..36497869e2 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala
@@ -22,7 +22,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
 import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
 import org.apache.kylin.common.asyncprofiler.Message._
 import org.apache.kylin.common.asyncprofiler.{AsyncProfilerTool, AsyncProfilerUtils}
-import org.apache.kylin.common.util.HadoopUtil
+import org.apache.kylin.common.util.{ExecutorServiceUtil, HadoopUtil}
 import org.apache.spark.SparkContext
 import org.apache.spark.api.plugin.{DriverPlugin, PluginContext}
 import org.apache.spark.internal.Logging
@@ -71,7 +71,10 @@ class BuildAsyncProfilerDriverPlugin extends DriverPlugin with Logging {
       val profile = new Runnable {
         override def run(): Unit = checkAction()
       }
-      log.debug(s"AsyncProfiler status: ${AsyncProfilerTool.status()}")
+      val deployMode = sc.getConf.get("spark.submit.deployMode", "")
+      log.info("Current spark.submit.deployMode: {}", deployMode)
+      AsyncProfilerTool.loadAsyncProfilerLib(deployMode.equals("client"))
+      log.info(s"AsyncProfiler status: ${AsyncProfilerTool.status()}")
       scheduledExecutorService.scheduleWithFixedDelay(
         profile, 0, checkingInterval, TimeUnit.MILLISECONDS)
 
@@ -129,6 +132,8 @@ class BuildAsyncProfilerDriverPlugin extends DriverPlugin with Logging {
   override def shutdown(): Unit = {
     val fs: FileSystem = HadoopUtil.getFileSystem(statusFileName)
     HadoopUtil.writeStringToHdfs(fs, ProfilerStatus.CLOSED, statusFileName)
+    ExecutorServiceUtil.shutdownGracefully(scheduledExecutorService, 3)
+    super.shutdown()
   }
 
   def start(params: String): Unit = {