You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ya...@apache.org on 2021/06/22 11:41:54 UTC
[kylin] 01/03: KYLIN-4895 change spark deploy mode of kylin4.0
engine from local to cluster
This is an automated email from the ASF dual-hosted git repository.
yaqian pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5d552fe61e4a05b5f061805807f866ce53f4c03a
Author: tianhui5 <ti...@xiaomi.com>
AuthorDate: Mon Feb 22 10:15:46 2021 +0800
KYLIN-4895 change spark deploy mode of kylin4.0 engine from local to cluster
---
.../engine/spark/application/SparkApplication.java | 20 ++++++++++++--------
.../kylin/engine/spark/job/NSparkExecutable.java | 15 +++++++++++----
.../org/apache/spark/application/JobWorkSpace.scala | 6 +++++-
3 files changed, 28 insertions(+), 13 deletions(-)
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 10af5cd..9baa490 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark.application;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
@@ -33,14 +34,15 @@ import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
import org.apache.kylin.engine.spark.job.UdfManager;
import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
+
+import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
+import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
@@ -79,8 +81,14 @@ public abstract class SparkApplication {
protected BuildJobInfos infos;
public void execute(String[] args) {
- try {
- String argsLine = Files.readAllLines(Paths.get(args[0])).get(0);
+ Path path = new Path(args[0]);
+ try (
+ FileSystem fileSystem = FileSystem.get(path.toUri(), HadoopUtil.getCurrentConfiguration());
+ FSDataInputStream inputStream = fileSystem.open(path);
+ InputStreamReader inputStreamReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
+ BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
+ ) {
+ String argsLine = bufferedReader.readLine();
if (argsLine.isEmpty()) {
throw new RuntimeException("Args file is empty");
}
@@ -312,10 +320,6 @@ public abstract class SparkApplication {
if (infos != null) {
infos.jobEnd();
}
- if (ss != null && !ss.conf().get("spark.master").startsWith("local")) {
- //JobMetricsUtils.unRegisterListener(ss);
- ss.stop();
- }
}
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 557627d..66fa91d 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -18,6 +18,7 @@
package org.apache.kylin.engine.spark.job;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
@@ -38,8 +39,8 @@ import java.util.Map.Entry;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -169,11 +170,15 @@ public class NSparkExecutable extends AbstractExecutable {
String dumpArgs() throws ExecuteException {
File tmpDir = null;
try {
- tmpDir = File.createTempFile(MetadataConstants.P_SEGMENT_IDS, "");
- FileUtils.writeByteArrayToFile(tmpDir, JsonUtil.writeValueAsBytes(getParams()));
+ String pathName = getId() + "_" + MetadataConstants.P_JOB_ID;
+ Path tgtPath = new Path(getConfig().getJobTmpDir(getParams().get("project")), pathName);
+ FileSystem fileSystem = FileSystem.get(tgtPath.toUri(), HadoopUtil.getCurrentConfiguration());
+ try (BufferedOutputStream outputStream = new BufferedOutputStream(fileSystem.create(tgtPath, false))) {
+ outputStream.write(JsonUtil.writeValueAsBytes(getParams()));
+ }
logger.info("Spark job args json is : {}.", JsonUtil.writeValueAsString(getParams()));
- return tmpDir.getCanonicalPath();
+ return tgtPath.toUri().toString();
} catch (IOException e) {
if (tmpDir != null && tmpDir.exists()) {
try {
@@ -382,6 +387,8 @@ public class NSparkExecutable extends AbstractExecutable {
if (StringUtils.isNotBlank(sparkUploadFiles)) {
sb.append("--files ").append(sparkUploadFiles).append(" ");
}
+ sb.append("--principal ").append(config.getKerberosPrincipal()).append(" ");
+ sb.append("--keytab ").append(config.getKerberosKeytabPath()).append(" ");
sb.append("--name job_step_%s ");
sb.append("--jars %s %s %s");
String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, sparkSubmitCmd, getId(), jars, kylinJobJar,
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
index c168632..a4ddf25 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
@@ -35,7 +35,11 @@ object JobWorkSpace extends Logging {
val worker = new JobWorker(application, appArgs, eventLoop)
val monitor = new JobMonitor(eventLoop)
val workspace = new JobWorkSpace(eventLoop, monitor, worker)
- System.exit(workspace.run())
+ if (System.getProperty("spark.master").equals("yarn") && System.getProperty("spark.submit.deployMode").equals("cluster")) {
+ workspace.run()
+ } else {
+ System.exit(workspace.run())
+ }
} catch {
case throwable: Throwable =>
logError("Error occurred when init job workspace.", throwable)