You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ya...@apache.org on 2021/06/22 11:41:53 UTC

[kylin] branch kylin-on-parquet-v2 updated (23ecb27 -> 25080df)

This is an automated email from the ASF dual-hosted git repository.

yaqian pushed a change to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from 23ecb27  KYLIN-5013 Write table_snapshot to wrong cluster in Kylin4.0
     new 5d552fe  KYLIN-4895 change spark deploy mode of kylin4.0 engine from local to cluster
     new ba33480  force delete spark deployMode in NSparkLocalStep
     new 25080df  add kerberos usage check before set spark submit conf

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../engine/spark/application/SparkApplication.java   | 20 ++++++++++++--------
 .../kylin/engine/spark/job/NSparkExecutable.java     | 17 +++++++++++++----
 .../kylin/engine/spark/job/NSparkLocalStep.java      |  3 ++-
 .../org/apache/spark/application/JobWorkSpace.scala  |  6 +++++-
 4 files changed, 32 insertions(+), 14 deletions(-)

[kylin] 01/03: KYLIN-4895 change spark deploy mode of kylin4.0 engine from local to cluster

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yaqian pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5d552fe61e4a05b5f061805807f866ce53f4c03a
Author: tianhui5 <ti...@xiaomi.com>
AuthorDate: Mon Feb 22 10:15:46 2021 +0800

    KYLIN-4895 change spark deploy mode of kylin4.0 engine from local to cluster
---
 .../engine/spark/application/SparkApplication.java   | 20 ++++++++++++--------
 .../kylin/engine/spark/job/NSparkExecutable.java     | 15 +++++++++++----
 .../org/apache/spark/application/JobWorkSpace.scala  |  6 +++++-
 3 files changed, 28 insertions(+), 13 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 10af5cd..9baa490 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark.application;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
@@ -33,14 +34,15 @@ import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
 import org.apache.kylin.engine.spark.job.UdfManager;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.engine.spark.utils.SparkConfHelper;
+
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Locale;
 import java.util.Map;
@@ -79,8 +81,14 @@ public abstract class SparkApplication {
     protected BuildJobInfos infos;
 
     public void execute(String[] args) {
-        try {
-            String argsLine = Files.readAllLines(Paths.get(args[0])).get(0);
+        Path path = new Path(args[0]);
+        try (
+                FileSystem fileSystem = FileSystem.get(path.toUri(), HadoopUtil.getCurrentConfiguration());
+                FSDataInputStream inputStream = fileSystem.open(path);
+                InputStreamReader inputStreamReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
+                BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
+        ) {
+            String argsLine = bufferedReader.readLine();
             if (argsLine.isEmpty()) {
                 throw new RuntimeException("Args file is empty");
             }
@@ -312,10 +320,6 @@ public abstract class SparkApplication {
             if (infos != null) {
                 infos.jobEnd();
             }
-            if (ss != null && !ss.conf().get("spark.master").startsWith("local")) {
-                //JobMetricsUtils.unRegisterListener(ss);
-                ss.stop();
-            }
         }
     }
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 557627d..66fa91d 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.engine.spark.job;
 
+import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.IOException;
 
@@ -38,8 +39,8 @@ import java.util.Map.Entry;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 
 import org.apache.hadoop.conf.Configuration;
@@ -169,11 +170,15 @@ public class NSparkExecutable extends AbstractExecutable {
     String dumpArgs() throws ExecuteException {
         File tmpDir = null;
         try {
-            tmpDir = File.createTempFile(MetadataConstants.P_SEGMENT_IDS, "");
-            FileUtils.writeByteArrayToFile(tmpDir, JsonUtil.writeValueAsBytes(getParams()));
+            String pathName = getId() + "_" + MetadataConstants.P_JOB_ID;
+            Path tgtPath = new Path(getConfig().getJobTmpDir(getParams().get("project")), pathName);
+            FileSystem fileSystem = FileSystem.get(tgtPath.toUri(), HadoopUtil.getCurrentConfiguration());
+            try (BufferedOutputStream outputStream = new BufferedOutputStream(fileSystem.create(tgtPath, false))) {
+                outputStream.write(JsonUtil.writeValueAsBytes(getParams()));
+            }
 
             logger.info("Spark job args json is : {}.", JsonUtil.writeValueAsString(getParams()));
-            return tmpDir.getCanonicalPath();
+            return tgtPath.toUri().toString();
         } catch (IOException e) {
             if (tmpDir != null && tmpDir.exists()) {
                 try {
@@ -382,6 +387,8 @@ public class NSparkExecutable extends AbstractExecutable {
         if (StringUtils.isNotBlank(sparkUploadFiles)) {
             sb.append("--files ").append(sparkUploadFiles).append(" ");
         }
+        sb.append("--principal ").append(config.getKerberosPrincipal()).append(" ");
+        sb.append("--keytab ").append(config.getKerberosKeytabPath()).append(" ");
         sb.append("--name job_step_%s ");
         sb.append("--jars %s %s %s");
         String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, sparkSubmitCmd, getId(), jars, kylinJobJar,
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
index c168632..a4ddf25 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
+++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/spark/application/JobWorkSpace.scala
@@ -35,7 +35,11 @@ object JobWorkSpace extends Logging {
       val worker = new JobWorker(application, appArgs, eventLoop)
       val monitor = new JobMonitor(eventLoop)
       val workspace = new JobWorkSpace(eventLoop, monitor, worker)
-      System.exit(workspace.run())
+      if (System.getProperty("spark.master").equals("yarn") && System.getProperty("spark.submit.deployMode").equals("cluster")) {
+        workspace.run()
+      } else {
+        System.exit(workspace.run())
+      }
     } catch {
       case throwable: Throwable =>
         logError("Error occurred when init job workspace.", throwable)

[kylin] 03/03: add kerberos usage check before set spark submit conf

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yaqian pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 25080df65fae347edc28ebf73167fae7689aa561
Author: tianhui5 <ti...@xiaomi.com>
AuthorDate: Mon Jun 21 14:45:04 2021 +0800

    add kerberos usage check before set spark submit conf
---
 .../java/org/apache/kylin/engine/spark/job/NSparkExecutable.java    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 66fa91d..b88b05a 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -387,8 +387,10 @@ public class NSparkExecutable extends AbstractExecutable {
         if (StringUtils.isNotBlank(sparkUploadFiles)) {
             sb.append("--files ").append(sparkUploadFiles).append(" ");
         }
-        sb.append("--principal ").append(config.getKerberosPrincipal()).append(" ");
-        sb.append("--keytab ").append(config.getKerberosKeytabPath()).append(" ");
+        if (config.isKerberosEnabled()) {
+            sb.append("--principal ").append(config.getKerberosPrincipal()).append(" ");
+            sb.append("--keytab ").append(config.getKerberosKeytabPath()).append(" ");
+        }
         sb.append("--name job_step_%s ");
         sb.append("--jars %s %s %s");
         String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, sparkSubmitCmd, getId(), jars, kylinJobJar,

[kylin] 02/03: force delete spark deployMode in NSparkLocalStep

Posted by ya...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yaqian pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ba3348039f2e39f4262d80792a7aa2fbc14604aa
Author: tianhui5 <ti...@xiaomi.com>
AuthorDate: Mon Jun 21 11:35:05 2021 +0800

    force delete spark deployMode in NSparkLocalStep
---
 .../main/java/org/apache/kylin/engine/spark/job/NSparkLocalStep.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkLocalStep.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkLocalStep.java
index 1bd63ba..5a5dad5 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkLocalStep.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkLocalStep.java
@@ -28,7 +28,8 @@ import java.util.Set;
 public class NSparkLocalStep extends NSparkExecutable {
     private final static String[] excludedSparkConf = new String[] {"spark.executor.cores",
             "spark.executor.memoryOverhead", "spark.executor.extraJavaOptions",
-            "spark.executor.instances", "spark.executor.memory", "spark.executor.extraClassPath"};
+            "spark.executor.instances", "spark.executor.memory", "spark.executor.extraClassPath",
+            "spark.submit.deployMode"};
 
     @Override
     protected Set<String> getMetadataDumpList(KylinConfig config) {