You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ya...@apache.org on 2021/06/22 11:41:54 UTC

[kylin] 01/03: KYLIN-4895 change spark deploy mode of kylin4.0 engine from local to cluster

This is an automated email from the ASF dual-hosted git repository.

yaqian pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5d552fe61e4a05b5f061805807f866ce53f4c03a
Author: tianhui5 <ti...@xiaomi.com>
AuthorDate: Mon Feb 22 10:15:46 2021 +0800

    KYLIN-4895 change spark deploy mode of kylin4.0 engine from local to cluster
---
 .../engine/spark/application/SparkApplication.java   | 20 ++++++++++++--------
 .../kylin/engine/spark/job/NSparkExecutable.java     | 15 +++++++++++----
 .../org/apache/spark/application/JobWorkSpace.scala  |  6 +++++-
 3 files changed, 28 insertions(+), 13 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 10af5cd..9baa490 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark.application;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
@@ -33,14 +34,15 @@ import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
 import org.apache.kylin.engine.spark.job.UdfManager;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.engine.spark.utils.SparkConfHelper;
+
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
@@ -79,8 +81,14 @@ public abstract class SparkApplication {
     protected BuildJobInfos infos;
 
     public void execute(String[] args) {
-        try {
-            String argsLine = Files.readAllLines(Paths.get(args[0])).get(0);
+        Path path = new Path(args[0]);
+        try (
+                FileSystem fileSystem = FileSystem.get(path.toUri(), HadoopUtil.getCurrentConfiguration());
+                FSDataInputStream inputStream = fileSystem.open(path);
+                InputStreamReader inputStreamReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
+                BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
+        ) {
+            String argsLine = bufferedReader.readLine();
             if (argsLine.isEmpty()) {
                 throw new RuntimeException("Args file is empty");
             }
@@ -312,10 +320,6 @@ public abstract class SparkApplication {
             if (infos != null) {
                 infos.jobEnd();
             }
-            if (ss != null && !ss.conf().get("spark.master").startsWith("local")) {
-                //JobMetricsUtils.unRegisterListener(ss);
-                ss.stop();
-            }
         }
     }
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 557627d..66fa91d 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.engine.spark.job;
 
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.IOException;
 
@@ -38,8 +39,8 @@ import java.util.Map.Entry;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.hadoop.conf.Configuration;
@@ -169,11 +170,15 @@ public class NSparkExecutable extends AbstractExecutable {
     String dumpArgs() throws ExecuteException {
         File tmpDir = null;
         try {
-            tmpDir = File.createTempFile(MetadataConstants.P_SEGMENT_IDS, "");
-            FileUtils.writeByteArrayToFile(tmpDir, JsonUtil.writeValueAsBytes(getParams()));
+            String pathName = getId() + "_" + MetadataConstants.P_JOB_ID;
+            Path tgtPath = new Path(getConfig().getJobTmpDir(getParams().get("project")), pathName);
+            FileSystem fileSystem = FileSystem.get(tgtPath.toUri(), HadoopUtil.getCurrentConfiguration());
+            try (BufferedOutputStream outputStream = new BufferedOutputStream(fileSystem.create(tgtPath, false))) {
+                outputStream.write(JsonUtil.writeValueAsBytes(getParams()));
+            }
 
             logger.info("Spark job args json is : {}.", JsonUtil.writeValueAsString(getParams()));
-            return tmpDir.getCanonicalPath();
+            return tgtPath.toUri().toString();
         } catch (IOException e) {
             if (tmpDir != null && tmpDir.exists()) {
                 try {
@@ -382,6 +387,8 @@ public class NSparkExecutable extends AbstractExecutable {
         if (StringUtils.isNotBlank(sparkUploadFiles)) {
             sb.append("--files ").append(sparkUploadFiles).append(" ");
         }
+        sb.append("--principal ").append(config.getKerberosPrincipal()).append(" ");
+        sb.append("--keytab ").append(config.getKerberosKeytabPath()).append(" ");
         sb.append("--name job_step_%s ");
         sb.append("--jars %s %s %s");
         String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, sparkSubmitCmd, getId(), jars, kylinJobJar,
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
index c168632..a4ddf25 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
@@ -35,7 +35,11 @@ object JobWorkSpace extends Logging {
       val worker = new JobWorker(application, appArgs, eventLoop)
       val monitor = new JobMonitor(eventLoop)
       val workspace = new JobWorkSpace(eventLoop, monitor, worker)
-      System.exit(workspace.run())
+      if (System.getProperty("spark.master").equals("yarn") && System.getProperty("spark.submit.deployMode").equals("cluster")) {
+        workspace.run()
+      } else {
+        System.exit(workspace.run())
+      }
     } catch {
       case throwable: Throwable =>
         logError("Error occurred when init job workspace.", throwable)