You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/12 09:28:11 UTC
[kylin] 15/17: KYLIN-5398 Flame graph availability fix
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d3ecc5a0847da26178e84cf13d55c0378764452e
Author: Guoliang Sun <gu...@kyligence.io>
AuthorDate: Thu Nov 10 15:13:06 2022 +0800
KYLIN-5398 Flame graph availability fix
---
.../libasyncProfiler-linux-arm64.so | Bin 0 -> 298256 bytes
.../libasyncProfiler-linux-x64.so | Bin 0 -> 303903 bytes
build/async-profiler-lib/libasyncProfiler-mac.so | Bin 0 -> 634272 bytes
build/release/compress.sh | 5 ++
.../java/org/apache/kylin/common/KapConfig.java | 13 +++-
.../org/apache/kylin/common/KylinConfigBase.java | 11 ++-
.../kylin/common/asyncprofiler/AsyncArchUtil.java | 77 +++++++++++++++++++++
.../kylin/common/asyncprofiler/AsyncProfiler.java | 69 +++++++++++++-----
.../common/asyncprofiler/AsyncProfilerUtils.java | 4 ++
.../libasyncProfiler-linux-arm64.so | Bin 0 -> 298256 bytes
.../libasyncProfiler-linux-x64.so | Bin 0 -> 303903 bytes
.../async-profiler-lib/libasyncProfiler-mac.so | Bin 0 -> 634272 bytes
.../async-profiler-lib/linux64/libasyncProfiler.so | Bin 314098 -> 0 bytes
.../async-profiler-lib/macOS/libasyncProfiler.so | Bin 239460 -> 0 bytes
.../AsyncProfilerExecutorPlugin.scala | 8 ++-
.../common/asyncprofiler/AsyncProfilerTool.scala | 8 ++-
.../apache/kylin/common/KylinConfigBaseTest.java | 1 +
.../common/asyncprofiler/AsyncArchUtilTest.java} | 34 ++++-----
.../common/asyncprofiler/AsyncProfilerTest.java | 17 +++--
.../asyncprofiler/AsyncProfilerToolTest.java | 24 +++++++
.../asyncprofiler/AsyncProfilerUtilsTest.java | 16 +++++
.../org/apache/kylin/rest/service/JobService.java | 3 +-
.../localmeta/lib/libasyncProfiler-linux-arm64.so | Bin 0 -> 298256 bytes
.../localmeta/lib/libasyncProfiler-linux-x64.so | Bin 0 -> 303903 bytes
.../localmeta/lib/libasyncProfiler-mac.so | Bin 0 -> 634272 bytes
.../kylin/engine/spark/job/NSparkExecutable.java | 55 +++++++++------
.../kylin/query/asyncprofiler/AsyncProfiling.scala | 9 ++-
.../QueryAsyncProfilerDriverPlugin.scala | 7 +-
.../query/asyncprofiler/AsyncPluginWithMeta.scala | 8 +--
.../query/asyncprofiler/AsyncProfilingTest.scala | 45 +++++-------
.../QueryAsyncProfilerDriverPluginTest.scala | 27 ++------
.../BuildAsyncProfilerDriverPlugin.scala | 9 ++-
32 files changed, 318 insertions(+), 132 deletions(-)
diff --git a/build/async-profiler-lib/libasyncProfiler-linux-arm64.so b/build/async-profiler-lib/libasyncProfiler-linux-arm64.so
new file mode 100644
index 0000000000..b959823506
Binary files /dev/null and b/build/async-profiler-lib/libasyncProfiler-linux-arm64.so differ
diff --git a/build/async-profiler-lib/libasyncProfiler-linux-x64.so b/build/async-profiler-lib/libasyncProfiler-linux-x64.so
new file mode 100644
index 0000000000..6d961cec0d
Binary files /dev/null and b/build/async-profiler-lib/libasyncProfiler-linux-x64.so differ
diff --git a/build/async-profiler-lib/libasyncProfiler-mac.so b/build/async-profiler-lib/libasyncProfiler-mac.so
new file mode 100644
index 0000000000..ad45237d5f
Binary files /dev/null and b/build/async-profiler-lib/libasyncProfiler-mac.so differ
diff --git a/build/release/compress.sh b/build/release/compress.sh
index 61c7f9eb9d..bf9d584b92 100755
--- a/build/release/compress.sh
+++ b/build/release/compress.sh
@@ -43,6 +43,11 @@ if [[ -d "influxdb" ]]; then
cp -rf postgresql ${package_name}/
fi
+# copy async profiler native files
+cp -rf async-profiler-lib/libasyncProfiler-mac.so "${package_name}"/lib/libasyncProfiler-mac.so
+cp -rf async-profiler-lib/libasyncProfiler-linux-x64.so "${package_name}"/lib/libasyncProfiler-linux-x64.so
+cp -rf async-profiler-lib/libasyncProfiler-linux-arm64.so "${package_name}"/lib/libasyncProfiler-linux-arm64.so
+
# Add ssb data preparation files
mkdir -p ${package_name}/tool/ssb
cp -rf ../src/examples/sample_cube/data ${package_name}/tool/ssb/
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
index 6cb7cbe9b2..06123ccf6c 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KapConfig.java
@@ -22,12 +22,14 @@ import java.io.File;
import java.io.IOException;
import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.annotation.Clarification;
import org.apache.kylin.common.util.EncryptUtil;
import org.apache.kylin.common.util.FileUtils;
+@Slf4j
@Clarification(priority = Clarification.Priority.MAJOR, msg = "Enterprise")
public class KapConfig {
@@ -248,7 +250,7 @@ public class KapConfig {
}
/**
- * Smart modeling
+ * Smart modeling
*/
public String getSmartModelingConf(String conf) {
return config.getOptional("kylin.smart.conf." + conf, null);
@@ -515,16 +517,21 @@ public class KapConfig {
public String sparderFiles() {
try {
- File storageFile = new File(getKylinConfig().getLogSparkExecutorPropertiesFile());
+ File storageFile = new File(config.getLogSparkExecutorPropertiesFile());
String additionalFiles = storageFile.getCanonicalPath();
- storageFile = new File(getKylinConfig().getLogSparkAppMasterPropertiesFile());
+ storageFile = new File(config.getLogSparkAppMasterPropertiesFile());
if (additionalFiles.isEmpty()) {
additionalFiles = storageFile.getCanonicalPath();
} else {
additionalFiles = additionalFiles + "," + storageFile.getCanonicalPath();
}
+ if (config.asyncProfilingEnabled()) {
+ additionalFiles = additionalFiles + "," + config.getAsyncProfilerFiles();
+ }
+ log.info("Sparder additionalFiles: {}", additionalFiles);
return config.getOptional("kylin.query.engine.sparder-additional-files", additionalFiles);
} catch (IOException e) {
+ log.error("Add sparderFiles failed, " + e);
return "";
}
}
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 0802fc242e..557901c837 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -19,6 +19,8 @@
package org.apache.kylin.common;
import static java.lang.Math.toIntExact;
+import static org.apache.kylin.common.asyncprofiler.AsyncProfiler.ASYNC_PROFILER_LIB_LINUX_ARM64;
+import static org.apache.kylin.common.asyncprofiler.AsyncProfiler.ASYNC_PROFILER_LIB_LINUX_X64;
import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_CONNECTION_URL_KEY;
import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_DRIVER_KEY;
import static org.apache.kylin.common.constant.Constants.KYLIN_SOURCE_JDBC_PASS_KEY;
@@ -2636,6 +2638,13 @@ public abstract class KylinConfigBase implements Serializable {
return getLogPropertyFile("spark-appmaster-log4j.xml");
}
+ public String getAsyncProfilerFiles() throws IOException {
+ String kylinHome = getKylinHomeWithoutWarn();
+ File libX64 = new File(kylinHome + "/lib/" + ASYNC_PROFILER_LIB_LINUX_X64);
+ File libArm64 = new File(kylinHome + "/lib/" + ASYNC_PROFILER_LIB_LINUX_ARM64);
+ return libX64.getCanonicalPath() + "," + libArm64.getCanonicalPath();
+ }
+
private String getLogPropertyFile(String filename) {
String parentFolder;
if (isDevEnv()) {
@@ -3597,7 +3606,7 @@ public abstract class KylinConfigBase implements Serializable {
public boolean buildJobProfilingEnabled() {
return !Boolean.parseBoolean(System.getProperty("spark.local", FALSE))
- && Boolean.parseBoolean(getOptional("kylin.engine.async-profiler-enabled", FALSE));
+ && Boolean.parseBoolean(getOptional("kylin.engine.async-profiler-enabled", TRUE));
}
public long buildJobProfilingResultTimeout() {
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtil.java b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtil.java
new file mode 100644
index 0000000000..a1b77ab622
--- /dev/null
+++ b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtil.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.asyncprofiler;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Stream;
+
+public class AsyncArchUtil {
+
+ public enum ArchType {
+ LINUX_X64(), LINUX_ARM64()
+ }
+
+ private static final Map<String, ArchType> ARCH_TO_PROCESSOR = new HashMap<>();
+
+ static {
+ addProcessors(ArchType.LINUX_X64, "x86_64", "amd64");
+ addProcessors(ArchType.LINUX_ARM64, "aarch64");
+ }
+
+ public static ArchType getProcessor() {
+ return getProcessor(getSystemProperty("os.arch"));
+ }
+
+ /**
+ * Returns a {@link ArchType} object with the given value {@link String}. The {@link String} must be
+ * like a value returned by the os.arch System Property.
+ *
+ * @param osArch A {@link String} like a value returned by the os.arch System Property.
+ * @return A {@link ArchType} when it exists, else {@code null}.
+ */
+ public static ArchType getProcessor(final String osArch) {
+ return ARCH_TO_PROCESSOR.get(osArch);
+ }
+
+ /**
+ * <p>
+ * Gets a System property, defaulting to {@code null} if the property cannot be read.
+ * </p>
+ * <p>
+ * If a {@code SecurityException} is caught, the return value is {@code null} and a message is written to
+ * {@code System.err}.
+ * </p>
+ *
+ * @param property the system property name
+ * @return the system property value or {@code null} if a security problem occurs
+ */
+ private static String getSystemProperty(final String property) {
+ try {
+ return System.getProperty(property);
+ } catch (final SecurityException ex) {
+ // we are not allowed to look at this property, the SystemUtils property value will default to null.
+ return null;
+ }
+ }
+
+ private static void addProcessors(ArchType archType, final String... keys) {
+ Stream.of(keys).forEach(key -> ARCH_TO_PROCESSOR.put(key, archType));
+ }
+}
\ No newline at end of file
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java
index c49cdffe20..a0ba5f13a1 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfiler.java
@@ -18,27 +18,31 @@
package org.apache.kylin.common.asyncprofiler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.Objects;
+import org.apache.kylin.common.KylinConfigBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class AsyncProfiler {
private static final Logger logger = LoggerFactory.getLogger(AsyncProfiler.class);
- private static final String LIB_FILE = "libasyncProfiler.so";
- private static final String LIB_PARENT = "/async-profiler-lib";
- private static final String MAC_LIB_PATH = LIB_PARENT + "/macOS/" + LIB_FILE;
- private static final String LINUX_64_LIB_PATH = LIB_PARENT + "/linux64/" + LIB_FILE;
+ // async profiler native files
+ public static final String ASYNC_PROFILER_LIB_MAC = "libasyncProfiler-mac.so";
+ public static final String ASYNC_PROFILER_LIB_LINUX_X64 = "libasyncProfiler-linux-x64.so";
+ public static final String ASYNC_PROFILER_LIB_LINUX_ARM64 = "libasyncProfiler-linux-arm64.so";
+ private static final String LIB_PARENT = "/async-profiler-lib/";
private static AsyncProfiler profiler;
private boolean loaded = false;
- public static synchronized AsyncProfiler getInstance() {
+ public static synchronized AsyncProfiler getInstance(boolean loadLocalLib) {
if (profiler == null) {
- profiler = new AsyncProfiler();
+ profiler = new AsyncProfiler(loadLocalLib);
}
return profiler;
}
@@ -55,14 +59,43 @@ public class AsyncProfiler {
logger.info("Test arg for ut: {}", ignore);
}
- private AsyncProfiler() {
+ private AsyncProfiler(boolean loadLocalLib) {
try {
boolean isTestingOnLocalMac = System.getProperty("os.name", "").contains("Mac")
|| System.getProperty("os.name", "").contains("OS X");
if (isTestingOnLocalMac) {
- loadLibAsyncProfilerSO(MAC_LIB_PATH);
+ loadLibAsyncProfilerSO(LIB_PARENT + ASYNC_PROFILER_LIB_MAC);
} else {
- loadLibAsyncProfilerSO(LINUX_64_LIB_PATH);
+ String libName;
+ File libPath;
+
+ // Select native lib loading based on machine architecture
+ AsyncArchUtil.ArchType archType = AsyncArchUtil.getProcessor();
+ logger.info("Machine's archType: {}", archType);
+ switch (archType) {
+ case LINUX_ARM64:
+ libName = ASYNC_PROFILER_LIB_LINUX_ARM64;
+ break;
+ case LINUX_X64:
+ default:
+ libName = ASYNC_PROFILER_LIB_LINUX_X64;
+ break;
+ }
+
+ // Adapting load paths based on Spark deployment patterns
+ if (loadLocalLib) {
+ libPath = new File(KylinConfigBase.getKylinHome() + "/lib/" + libName);
+ } else {
+ libPath = new File(libName);
+ }
+ logger.info("AsyncProfiler libPath: {}, exists: {}", libPath.getAbsolutePath(),
+ Files.exists(libPath.toPath()));
+ // check this for ut
+ if (libPath.exists()) {
+ System.load(libPath.getAbsolutePath());
+ } else {
+ loadLibAsyncProfilerSO(LIB_PARENT + libName);
+ }
}
loaded = true;
} catch (Exception e) {
@@ -71,12 +104,12 @@ public class AsyncProfiler {
}
private void loadLibAsyncProfilerSO(String libPath) throws IOException {
- final java.nio.file.Path tmpLib = java.io.File.createTempFile("libasyncProfiler", ".so").toPath();
- java.nio.file.Files.copy(
- Objects.requireNonNull(AsyncProfilerTool.class.getResourceAsStream(libPath)),
- tmpLib,
- java.nio.file.StandardCopyOption.REPLACE_EXISTING);
- System.load(tmpLib.toAbsolutePath().toString());
+ File asyncProfilerLib = File.createTempFile("libasyncProfiler", ".so");
+ java.nio.file.Files.copy(Objects.requireNonNull(AsyncProfilerTool.class.getResourceAsStream(libPath)),
+ asyncProfilerLib.toPath(), java.nio.file.StandardCopyOption.REPLACE_EXISTING);
+ logger.info("AsyncProfiler will try to load from libPath: {}, exists: {}", asyncProfilerLib.getAbsolutePath(),
+ asyncProfilerLib.exists());
+ System.load(asyncProfilerLib.getAbsolutePath());
}
public boolean isLoaded() {
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java
index e86648268a..cd74a7cd33 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtils.java
@@ -53,6 +53,10 @@ public class AsyncProfilerUtils {
this.cachedResult = countDownLatch;
}
+ public void build(File localCacheDir) {
+ this.localCacheDir = localCacheDir;
+ }
+
public void build(long resultCollectionTimeout, File localCacheDir) {
this.resultCollectionTimeout = resultCollectionTimeout;
this.localCacheDir = localCacheDir;
diff --git a/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-arm64.so b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-arm64.so
new file mode 100644
index 0000000000..b959823506
Binary files /dev/null and b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-arm64.so differ
diff --git a/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-x64.so b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-x64.so
new file mode 100644
index 0000000000..6d961cec0d
Binary files /dev/null and b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-linux-x64.so differ
diff --git a/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-mac.so b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-mac.so
new file mode 100644
index 0000000000..ad45237d5f
Binary files /dev/null and b/src/core-common/src/main/resources/async-profiler-lib/libasyncProfiler-mac.so differ
diff --git a/src/core-common/src/main/resources/async-profiler-lib/linux64/libasyncProfiler.so b/src/core-common/src/main/resources/async-profiler-lib/linux64/libasyncProfiler.so
deleted file mode 100755
index 4153f52868..0000000000
Binary files a/src/core-common/src/main/resources/async-profiler-lib/linux64/libasyncProfiler.so and /dev/null differ
diff --git a/src/core-common/src/main/resources/async-profiler-lib/macOS/libasyncProfiler.so b/src/core-common/src/main/resources/async-profiler-lib/macOS/libasyncProfiler.so
deleted file mode 100755
index c7298c6b82..0000000000
Binary files a/src/core-common/src/main/resources/async-profiler-lib/macOS/libasyncProfiler.so and /dev/null differ
diff --git a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala
index fe4a3918a2..e423c98f81 100644
--- a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala
+++ b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerExecutorPlugin.scala
@@ -20,6 +20,7 @@ package org.apache.kylin.common.asyncprofiler
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.kylin.common.asyncprofiler.Message._
+import org.apache.kylin.common.util.ExecutorServiceUtil
import org.apache.spark.api.plugin.{ExecutorPlugin, PluginContext}
import org.apache.spark.internal.Logging
@@ -40,7 +41,8 @@ class AsyncProfilerExecutorPlugin extends ExecutorPlugin with Logging {
val profile = new Runnable {
override def run(): Unit = checkAndProfile()
}
- log.debug(s"AsyncProfiler status: ${AsyncProfilerTool.status()}")
+ AsyncProfilerTool.loadAsyncProfilerLib(false)
+ log.info(s"AsyncProfiler status: ${AsyncProfilerTool.status()}")
scheduledExecutorService.scheduleWithFixedDelay(
profile, 0, checkingInterval, TimeUnit.MILLISECONDS)
}
@@ -80,4 +82,8 @@ class AsyncProfilerExecutorPlugin extends ExecutorPlugin with Logging {
ctx.send(msg)
}
+ override def shutdown(): Unit = {
+ ExecutorServiceUtil.shutdownGracefully(scheduledExecutorService, 3)
+ super.shutdown()
+ }
}
diff --git a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala
index 0deb703da9..69bb7c72bb 100644
--- a/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala
+++ b/src/core-common/src/main/scala/org/apache/kylin/common/asyncprofiler/AsyncProfilerTool.scala
@@ -27,7 +27,13 @@ object AsyncProfilerTool {
val log: Logger = LoggerFactory.getLogger(AsyncProfilerTool.getClass)
- private val profiler = AsyncProfiler.getInstance()
+ private var profiler: AsyncProfiler = _
+
+ def loadAsyncProfilerLib(loadLocalLib: Boolean): Unit = {
+ // Local - load Sparder Driver or (Spark Driver which in client mode)
+ // Remote - load all Executors or (Spark Engine which in cluster mode)
+ profiler = AsyncProfiler.getInstance(loadLocalLib)
+ }
private var _running = false
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
index f2c3cb1a2f..f60a2b003b 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/KylinConfigBaseTest.java
@@ -1191,6 +1191,7 @@ class KylinConfigBaseTest {
@Test
void testBuildJobProfilingEnabled() {
KylinConfig config = KylinConfig.getInstanceFromEnv();
+ config.setProperty("kylin.engine.async-profiler-enabled", "false");
assertFalse(config.buildJobProfilingEnabled());
config.setProperty("kylin.engine.async-profiler-enabled", "true");
assertTrue(config.buildJobProfilingEnabled());
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtilTest.java
similarity index 50%
copy from src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
copy to src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtilTest.java
index 68f344186b..0ba16c4a75 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
+++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncArchUtilTest.java
@@ -16,29 +16,25 @@
* limitations under the License.
*/
-package org.apache.kylin.query.asyncprofiler
+package org.apache.kylin.common.asyncprofiler;
-import org.apache.spark.SparkContext
-import org.apache.spark.api.plugin.{DriverPlugin, PluginContext}
-import org.apache.spark.internal.Logging
+import org.junit.Assert;
+import org.junit.Test;
-import java.util
+public class AsyncArchUtilTest {
-class QueryAsyncProfilerDriverPlugin extends DriverPlugin with Logging {
-
- override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = super.init(sc, pluginContext)
+ @Test
+ public void testGetProcessor() {
+ AsyncArchUtil.ArchType archType = AsyncArchUtil.getProcessor();
+ Assert.assertNotNull(archType);
+ }
- override def receive(message: Any): AnyRef = {
- import org.apache.kylin.common.asyncprofiler.Message._
+ @Test
+ public void testArchType() {
+ AsyncArchUtil.ArchType archType = AsyncArchUtil.getProcessor("x86_64");
+ Assert.assertEquals(AsyncArchUtil.ArchType.LINUX_X64, archType);
- val (command, executorId, param) = processMessage(message.toString)
- command match {
- case NEXT_COMMAND =>
- AsyncProfiling.nextCommand()
- case RESULT =>
- AsyncProfiling.cacheExecutorResult(param, executorId)
- ""
- case _ => ""
+ archType = AsyncArchUtil.getProcessor("aarch64");
+ Assert.assertEquals(AsyncArchUtil.ArchType.LINUX_ARM64, archType);
}
- }
}
\ No newline at end of file
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java
index 82ab3f7a35..176d9ad119 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerTest.java
@@ -26,8 +26,13 @@ import org.junit.Test;
public class AsyncProfilerTest {
@Test
- public void testLoaded() {
- Assert.assertTrue(AsyncProfiler.getInstance().isLoaded());
+ public void testLocalLoaded() {
+ Assert.assertTrue(AsyncProfiler.getInstance(true).isLoaded());
+ }
+
+ @Test
+ public void testRemoteLoaded() {
+ Assert.assertTrue(AsyncProfiler.getInstance(false).isLoaded());
}
// This may success in local Mac, but failed in CI
@@ -36,7 +41,7 @@ public class AsyncProfilerTest {
System.setProperty("os.name", "Mac");
String errorMsg = "";
try {
- AsyncProfiler.getInstance();
+ AsyncProfiler.getInstance(true);
} catch (Throwable throwable) {
errorMsg = throwable.getMessage();
}
@@ -45,7 +50,7 @@ public class AsyncProfilerTest {
@Test
public void testExecute() throws IOException {
- AsyncProfiler asyncProfiler = AsyncProfiler.getInstance();
+ AsyncProfiler asyncProfiler = AsyncProfiler.getInstance(true);
try {
asyncProfiler.execute("start,event=cpu");
asyncProfiler.stop();
@@ -59,12 +64,12 @@ public class AsyncProfilerTest {
@Test
public void testStop() {
Assert.assertThrows("Profiler is not active", IllegalStateException.class,
- AsyncProfiler.getInstance()::stop);
+ AsyncProfiler.getInstance(true)::stop);
}
@Test
public void testAsyncProfilerUtInstance() {
- AsyncProfiler originInstance = AsyncProfiler.getInstance();
+ AsyncProfiler originInstance = AsyncProfiler.getInstance(true);
AsyncProfiler utInstance = AsyncProfiler.utInstance();
Assert.assertSame(originInstance, utInstance);
}
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java
index 85b73a80bc..2cdb832339 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerToolTest.java
@@ -40,6 +40,30 @@ public class AsyncProfilerToolTest extends NLocalFileMetadataTestCase {
this.cleanupTestMetadata();
}
+ @Test
+ public void testLoadLocalAsyncProfilerLib() {
+ AsyncProfilerTool.loadAsyncProfilerLib(true);
+ String errorMsg = "";
+ try {
+ AsyncProfilerTool.status();
+ } catch (Exception e) {
+ errorMsg = e.getMessage();
+ }
+ Assert.assertTrue(errorMsg.isEmpty());
+ }
+
+ @Test
+ public void testLoadRemoteAsyncProfilerLib() {
+ AsyncProfilerTool.loadAsyncProfilerLib(false);
+ String errorMsg = "";
+ try {
+ AsyncProfilerTool.status();
+ } catch (Exception e) {
+ errorMsg = e.getMessage();
+ }
+ Assert.assertTrue(errorMsg.isEmpty());
+ }
+
@Test
public void testStartAndStop() {
try {
diff --git a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java
index 7eb636839a..fba1ad54a6 100644
--- a/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java
+++ b/src/core-common/src/test/java/org/apache/kylin/common/asyncprofiler/AsyncProfilerUtilsTest.java
@@ -57,6 +57,22 @@ public class AsyncProfilerUtilsTest {
Assert.assertEquals(testFile, asyncProfilerUtilsBuild.localCacheDir);
}
+ @Test
+ public void testBuildWithNewLocalCacheDir() throws IOException {
+ AsyncProfilerUtils asyncProfilerUtilsBuild = AsyncProfilerUtils.getInstance();
+ asyncProfilerUtilsBuild.build(new CountDownLatch(2));
+ Assert.assertEquals(2, asyncProfilerUtilsBuild.cachedResult.getCount());
+
+ File testFile = Files.createTempDirectory("ke-build-async-test-profiler-").toFile();
+ asyncProfilerUtilsBuild.build(2L, testFile);
+ Assert.assertEquals(2L, asyncProfilerUtilsBuild.resultCollectionTimeout);
+ Assert.assertEquals(testFile, asyncProfilerUtilsBuild.localCacheDir);
+
+ testFile = Files.createTempDirectory("ke-build-async-test-profiler-").toFile();
+ asyncProfilerUtilsBuild.build(testFile);
+ Assert.assertEquals(testFile, asyncProfilerUtilsBuild.localCacheDir);
+ }
+
@Test
public void testWaitForResultTimeout() throws IOException, InterruptedException {
AsyncProfilerUtils asyncProfilerUtils = AsyncProfilerUtils.getInstance();
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
index 22424681c5..494ac3b50e 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -47,7 +47,6 @@ import java.util.stream.Stream;
import javax.servlet.http.HttpServletRequest;
-import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.IOUtils;
@@ -98,6 +97,7 @@ import org.apache.kylin.job.execution.StageBase;
import org.apache.kylin.metadata.cube.model.NBatchConstants;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
+import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.kylin.metadata.model.FusionModel;
import org.apache.kylin.metadata.model.FusionModelManager;
import org.apache.kylin.metadata.model.NDataModel;
@@ -1340,6 +1340,7 @@ public class JobService extends BasicService implements JobSupporter, ISmartAppl
}
public void setResponseLanguage(HttpServletRequest request) {
+ aclEvaluate.checkIsGlobalAdmin();
String languageToHandle = request.getHeader(HttpHeaders.ACCEPT_LANGUAGE);
if (languageToHandle == null) {
ErrorCode.setMsg("cn");
diff --git a/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-arm64.so b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-arm64.so
new file mode 100644
index 0000000000..b959823506
Binary files /dev/null and b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-arm64.so differ
diff --git a/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-x64.so b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-x64.so
new file mode 100644
index 0000000000..6d961cec0d
Binary files /dev/null and b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-linux-x64.so differ
diff --git a/src/examples/test_case_data/localmeta/lib/libasyncProfiler-mac.so b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-mac.so
new file mode 100644
index 0000000000..ad45237d5f
Binary files /dev/null and b/src/examples/test_case_data/localmeta/lib/libasyncProfiler-mac.so differ
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index a842c926ca..d31d02f83e 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -18,13 +18,23 @@
package org.apache.kylin.engine.spark.job;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin;
-import lombok.val;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
@@ -60,26 +70,18 @@ import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.plugin.asyncprofiler.BuildAsyncProfilerSparkPlugin;
import org.apache.parquet.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.val;
/**
*
@@ -883,6 +885,13 @@ public class NSparkExecutable extends AbstractExecutable implements ChainedStage
filePaths.add(kylinConf.getLogSparkAppMasterPropertiesFile());
filePaths.add(kylinConf.getLogSparkDriverPropertiesFile());
filePaths.add(kylinConf.getLogSparkExecutorPropertiesFile());
+ if (kylinConf.buildJobProfilingEnabled()) {
+ try {
+ filePaths.add(kylinConf.getAsyncProfilerFiles());
+ } catch (IOException e) {
+ logger.error("Add SparkPluginFile failed.", e);
+ }
+ }
filePaths.add(sparkConf.get(SPARK_FILES_1));
filePaths.add(sparkConf.get(SPARK_FILES_2));
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala
index e1dc51869c..d27cfc76f5 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/AsyncProfiling.scala
@@ -25,14 +25,14 @@ import org.apache.kylin.common.exception.{KylinException, QueryErrorCode}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparderEnv
-import java.io.OutputStream
+import java.io.{File, OutputStream}
import java.nio.file.Files
import java.util.concurrent.CountDownLatch
object AsyncProfiling extends Logging {
- private val localCacheDir = Files.createTempDirectory("ke-async-profiler-result-").toFile
+ var localCacheDir: File = Files.createTempDirectory("ke-async-profiler-result-").toFile
localCacheDir.deleteOnExit()
private val resultCollectionTimeout = KylinConfig.getInstanceFromEnv.asyncProfilingResultTimeout
private val profilingTimeout = KylinConfig.getInstanceFromEnv.asyncProfilingProfileTimeout
@@ -57,6 +57,11 @@ object AsyncProfiling extends Logging {
throw new KylinException(QueryErrorCode.PROFILING_ALREADY_STARTED, "profiling is already started, stop it first")
}
logDebug("profiler start")
+ // Linux may periodically clean up the files in the /tmp directory
+ if (!localCacheDir.exists()) {
+ localCacheDir = Files.createTempDirectory("ke-async-profiler-result-").toFile
+ asyncProfilerUtils.build(localCacheDir)
+ }
asyncProfilerUtils.cleanLocalCache()
// expecting driver + count(executor) amount of results
cachedResult = new CountDownLatch(
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
index 68f344186b..8af9631561 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPlugin.scala
@@ -18,6 +18,7 @@
package org.apache.kylin.query.asyncprofiler
+import org.apache.kylin.common.asyncprofiler.AsyncProfilerTool
import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.{DriverPlugin, PluginContext}
import org.apache.spark.internal.Logging
@@ -26,7 +27,11 @@ import java.util
class QueryAsyncProfilerDriverPlugin extends DriverPlugin with Logging {
- override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = super.init(sc, pluginContext)
+ override def init(sc: SparkContext, pluginContext: PluginContext): util.Map[String, String] = {
+ // Sparder Driver and KE are always in one JVM, in client mode
+ AsyncProfilerTool.loadAsyncProfilerLib(true)
+ super.init(sc, pluginContext)
+ }
override def receive(message: Any): AnyRef = {
import org.apache.kylin.common.asyncprofiler.Message._
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala
index d94a560dd8..9db5d72000 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncPluginWithMeta.scala
@@ -19,7 +19,6 @@
package org.apache.kylin.query.asyncprofiler
import org.apache.kylin.common.util.NLocalFileMetadataTestCase
-import org.apache.spark.sql.{SparderEnv, SparkSession}
import org.apache.spark.{SparkContext, SparkFunSuite}
import org.scalatest.BeforeAndAfterAll
@@ -44,11 +43,10 @@ trait AsyncPluginWithMeta extends SparkFunSuite with BeforeAndAfterAll {
}
protected def clearSparkSession(): Unit = {
- if (SparderEnv.isSparkAvailable) {
- SparderEnv.getSparkSession.stop()
+ if (sc != null) {
+ sc.stop()
+ sc = null
}
- SparkSession.setActiveSession(null)
- SparkSession.setDefaultSession(null)
}
override def beforeAll(): Unit = {
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala
index c05923664d..21c08c4ee7 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/AsyncProfilingTest.scala
@@ -24,7 +24,7 @@ import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.{SparkConf, SparkContext}
import org.mockito.Mockito.mock
-import java.io.OutputStream
+import java.io.{File, OutputStream}
class AsyncProfilingTest extends AsyncPluginWithMeta {
@@ -34,55 +34,46 @@ class AsyncProfilingTest extends AsyncPluginWithMeta {
val statusFileName: String = flagFileDir + "/status"
val dumpFileName: String = flagFileDir + "/dump.tar.gz"
- test("init AsyncProfiling") {
- AsyncProfiling.asyncProfilerUtils
- }
-
- test("start and dump AsyncProfiling") {
+ override def beforeAll(): Unit = {
+ super.beforeAll()
val conf = new SparkConf()
.setAppName(getClass.getName)
.set(SparkLauncher.SPARK_MASTER, "local[1]")
.set("spark.plugins", sparkPluginName)
sc = new SparkContext(conf)
+ }
+
+ test("init AsyncProfiling") {
+ AsyncProfiling.asyncProfilerUtils
+ }
+
+ test("start and dump AsyncProfiling") {
AsyncProfiling.start("")
AsyncProfiling.dump("")
+ }
- sc.stop()
- sc = null
+ test("start with localCacheDir by delete") {
+ AsyncProfiling.nextCommand()
+ val localCacheDir = AsyncProfiling.localCacheDir
+ new File(localCacheDir.getAbsolutePath).delete()
+ AsyncProfiling.start("")
+ AsyncProfiling.dump("")
}
test("waitForResult AsyncProfiling") {
KylinConfig.getInstanceFromEnv.setProperty("kylin.query.async-profiler-result-timeout", "1ms")
- val conf = new SparkConf()
- .setAppName(getClass.getName)
- .set(SparkLauncher.SPARK_MASTER, "local[1]")
- .set("spark.plugins", sparkPluginName)
-
-
- sc = new SparkContext(conf)
AsyncProfiling.start("")
AsyncProfiling.dump("")
AsyncProfiling.waitForResult(mock(classOf[OutputStream]))
-
- sc.stop()
- sc = null
}
test("cacheExecutorResult AsyncProfiling") {
KylinConfig.getInstanceFromEnv.setProperty("kylin.query.async-profiler-result-timeout", "1ms")
- val conf = new SparkConf()
- .setAppName(getClass.getName)
- .set(SparkLauncher.SPARK_MASTER, "local[1]")
- .set("spark.plugins", sparkPluginName)
-
- sc = new SparkContext(conf)
AsyncProfiling.start("")
AsyncProfiling.cacheExecutorResult("content", "1")
-
- sc.stop()
- sc = null
+ AsyncProfiling.dump("")
}
}
diff --git a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala
index 51843bfbbf..8d82c3c684 100644
--- a/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala
+++ b/src/spark-project/sparder/src/test/scala/org/apache/kylin/query/asyncprofiler/QueryAsyncProfilerDriverPluginTest.scala
@@ -26,53 +26,36 @@ class QueryAsyncProfilerDriverPluginTest extends AsyncPluginWithMeta {
val sparkPluginName: String = classOf[QueryAsyncProfilerSparkPlugin].getName
- test("plugin initialization") {
+ override def beforeAll(): Unit = {
+ super.beforeAll()
val conf = new SparkConf()
.setAppName(getClass.getName)
.set(SparkLauncher.SPARK_MASTER, "local[1]")
.set("spark.plugins", sparkPluginName)
sc = new SparkContext(conf)
+ }
+
+ test("plugin initialization") {
Assert.assertEquals(sparkPluginName, sc.getConf.get("spark.plugins"))
new QueryAsyncProfilerDriverPlugin().receive("NEX-1:start,event=cpu")
-
- sc.stop()
- sc = null
}
test("plugin initialization receive result") {
- val conf = new SparkConf()
- .setAppName(getClass.getName)
- .set(SparkLauncher.SPARK_MASTER, "local[1]")
- .set("spark.plugins", sparkPluginName)
-
- sc = new SparkContext(conf)
Assert.assertEquals(sparkPluginName, sc.getConf.get("spark.plugins"))
try {
new QueryAsyncProfilerDriverPlugin().receive("RES-1:flamegraph")
} catch {
case _: Throwable =>
}
-
- sc.stop()
- sc = null
}
test("plugin initialization receive others") {
- val conf = new SparkConf()
- .setAppName(getClass.getName)
- .set(SparkLauncher.SPARK_MASTER, "local[1]")
- .set("spark.plugins", sparkPluginName)
-
- sc = new SparkContext(conf)
Assert.assertEquals(sparkPluginName, sc.getConf.get("spark.plugins"))
try {
new QueryAsyncProfilerDriverPlugin().receive("OTH-1:start,event=cpu")
} catch {
case _: Throwable =>
}
-
- sc.stop()
- sc = null
}
}
diff --git a/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala
index bbe900f49d..36497869e2 100644
--- a/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala
+++ b/src/spark-project/spark-common/src/main/scala/org/apache/kylin/plugin/asyncprofiler/BuildAsyncProfilerDriverPlugin.scala
@@ -22,7 +22,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem, Path}
import org.apache.kylin.common.asyncprofiler.Message._
import org.apache.kylin.common.asyncprofiler.{AsyncProfilerTool, AsyncProfilerUtils}
-import org.apache.kylin.common.util.HadoopUtil
+import org.apache.kylin.common.util.{ExecutorServiceUtil, HadoopUtil}
import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.{DriverPlugin, PluginContext}
import org.apache.spark.internal.Logging
@@ -71,7 +71,10 @@ class BuildAsyncProfilerDriverPlugin extends DriverPlugin with Logging {
val profile = new Runnable {
override def run(): Unit = checkAction()
}
- log.debug(s"AsyncProfiler status: ${AsyncProfilerTool.status()}")
+ val deployMode = sc.getConf.get("spark.submit.deployMode", "")
+ log.info("Current spark.submit.deployMode: {}", deployMode)
+ AsyncProfilerTool.loadAsyncProfilerLib(deployMode.equals("client"))
+ log.info(s"AsyncProfiler status: ${AsyncProfilerTool.status()}")
scheduledExecutorService.scheduleWithFixedDelay(
profile, 0, checkingInterval, TimeUnit.MILLISECONDS)
@@ -129,6 +132,8 @@ class BuildAsyncProfilerDriverPlugin extends DriverPlugin with Logging {
override def shutdown(): Unit = {
val fs: FileSystem = HadoopUtil.getFileSystem(statusFileName)
HadoopUtil.writeStringToHdfs(fs, ProfilerStatus.CLOSED, statusFileName)
+ ExecutorServiceUtil.shutdownGracefully(scheduledExecutorService, 3)
+ super.shutdown()
}
def start(params: String): Unit = {