You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/11/27 07:22:50 UTC

[kylin] 03/06: KYLIN-4813 Fix several bugs for executor-side log collection

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 86aae4dc1cd6f364c36e9f47db84b20956b2b911
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Sun Nov 22 22:04:30 2020 +0800

    KYLIN-4813 Fix several bugs for executor-side log collection
    
    - add --files to upload user-defined log4j properties
    - fix ClassName error
    - fix executor.extraJavaOptions be overwrote in Driver side
    - fix security issue (HTTP response splitting)
    - code style etc.
---
 build/conf/spark-executor-log4j.properties         |  4 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |  6 +-
 .../src/main/resources/kylin-defaults.properties   |  6 +-
 .../kylin/job/execution/ExecutableManager.java     | 26 +++----
 .../common/logging/SparkExecutorHdfsAppender.java  | 24 +++++-
 .../engine/spark/application/SparkApplication.java |  9 ++-
 .../kylin/engine/spark/job/NSparkExecutable.java   | 87 +++++++++++++---------
 .../scala/org/apache/spark/sql/KylinSession.scala  |  4 +-
 .../kylin/rest/controller/JobController.java       | 25 +++----
 .../org/apache/kylin/rest/service/JobService.java  |  4 +-
 10 files changed, 110 insertions(+), 85 deletions(-)

diff --git a/build/conf/spark-executor-log4j.properties b/build/conf/spark-executor-log4j.properties
index 7cc5b04..fb5b7e3 100644
--- a/build/conf/spark-executor-log4j.properties
+++ b/build/conf/spark-executor-log4j.properties
@@ -27,7 +27,7 @@ log4j.appender.stderr.target=System.err
 log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
 
 
-log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsLogAppender
+log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsAppender
 
 log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir}
 log4j.appender.hdfs.metadataIdentifier=${kylin.metadata.identifier}
@@ -41,6 +41,6 @@ log4j.appender.hdfs.logQueueCapacity=5000
 #flushPeriod count as millis
 log4j.appender.hdfs.flushInterval=5000
 
-log4j.appender.hdfs.layout=org.apache.kylin.engine.spark.common.logging.SensitivePatternLayout
+log4j.appender.hdfs.layout=org.apache.kylin.common.logging.SensitivePatternLayout
 #Don't add line number (%L) as it's too costly!
 log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e2727fa..01dc374 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2863,8 +2863,10 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("kylin.query.intersect.separator", "|");
     }
 
-    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
-    public String sparderFiles() {
+    /**
+     * Used to upload user-defined log4j configuration
+     */
+    public String sparkUploadFiles() {
         try {
             File storageFile = FileUtils.findFile(KylinConfigBase.getKylinHome() + "/conf",
                     "spark-executor-log4j.properties");
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 8fba3bc..858278f 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -259,7 +259,7 @@ kylin.engine.spark-conf.spark.eventLog.enabled=true
 kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
-kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=job -Dkylin.spark.project=${job.project} -Dkylin.spark.identifier=${job.id} -Dkylin.spark.jobName=${job.stepId} -Duser.timezone=${user.timezone}
+kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.spark.category=job
 #kylin.engine.spark-conf.spark.sql.shuffle.partitions=1
 
 # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
@@ -269,7 +269,6 @@ kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -D
 # uncomment for HDP
 #kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
-#kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
 
 ### SPARK QUERY ENGINE CONFIGS (a.k.a. Sparder Context) ###
 # Enlarge cores and memory to improve query performance in production env, please check https://cwiki.apache.org/confluence/display/KYLIN/User+Manual+4.X
@@ -287,11 +286,10 @@ kylin.query.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializ
 #kylin.query.spark-conf.spark.sql.shuffle.partitions=40
 #kylin.query.spark-conf.spark.yarn.jars=hdfs://localhost:9000/spark2_jars/*
 
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project} -XX:MaxDirectMemorySize=896M
+kylin.query.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project}
 # uncomment for HDP
 #kylin.query.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.query.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
-#kylin.query.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
 
 ### QUERY PUSH DOWN ###
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 590276d..42a9c99 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.job.execution;
 
@@ -255,10 +255,6 @@ public class ExecutableManager {
         return result;
     }
 
-    public Output getOutputFromHDFSByJobId(String jobId) {
-        return getOutputFromHDFSByJobId(jobId, jobId);
-    }
-
     public Output getOutputFromHDFSByJobId(String jobId, String stepId) {
         return getOutputFromHDFSByJobId(jobId, stepId, 100);
     }
@@ -593,7 +589,7 @@ public class ExecutableManager {
                 logger.warn("The job " + jobId + " has been discarded.");
             }
             throw new IllegalStateException(
-                "The job " + job.getId() + " has already been finished and cannot be discarded.");
+                    "The job " + job.getId() + " has already been finished and cannot be discarded.");
         }
         if (job instanceof DefaultChainedExecutable) {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
@@ -617,7 +613,7 @@ public class ExecutableManager {
             for (AbstractExecutable task : tasks) {
                 if (task.getId().compareTo(stepId) >= 0) {
                     logger.debug("rollback task : " + task);
-                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", task.getLogPath());
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), "", task.getLogPath());
                 }
             }
         }
@@ -634,11 +630,11 @@ public class ExecutableManager {
         }
 
         if (!(job.getStatus() == ExecutableState.READY
-            || job.getStatus() == ExecutableState.RUNNING)) {
+                || job.getStatus() == ExecutableState.RUNNING)) {
             logger.warn("The status of job " + jobId + " is " + job.getStatus().toString()
-                + ". It's final state and cannot be transfer to be stopped!!!");
+                    + ". It's final state and cannot be transfer to be stopped!!!");
             throw new IllegalStateException(
-                "The job " + job.getId() + " has already been finished and cannot be stopped.");
+                    "The job " + job.getId() + " has already been finished and cannot be stopped.");
         }
         if (job instanceof DefaultChainedExecutable) {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
@@ -662,7 +658,6 @@ public class ExecutableManager {
     }
 
     public void updateJobOutput(String project, String jobId, ExecutableState newStatus, Map<String, String> info, String output, String logPath) {
-        // when 
         if (Thread.currentThread().isInterrupted()) {
             throw new RuntimeException("Current thread is interruptted, aborting");
         }
@@ -701,7 +696,6 @@ public class ExecutableManager {
         }
 
         if (project != null) {
-            //write output to HDFS
             updateJobOutputToHDFS(project, jobId, output, logPath);
         }
     }
@@ -715,7 +709,7 @@ public class ExecutableManager {
             jobOutput.setLogPath(logPath);
         }
         String outputHDFSPath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(project, jobId);
-
+        logger.debug("Update JobOutput To HDFS for {} to {} [{}]", jobId, outputHDFSPath, jobOutput.getContent() != null ? jobOutput.getContent().length() : -1);
         updateJobOutputToHDFS(outputHDFSPath, jobOutput);
     }
 
@@ -786,7 +780,7 @@ public class ExecutableManager {
                 if (executableDao.getJobOutput(task.getId()).getStatus().equals("SUCCEED")) {
                     continue;
                 } else if (executableDao.getJobOutput(task.getId()).getStatus().equals("RUNNING")) {
-                    updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", null);
+                    updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), "", null);
                 }
                 break;
             }
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
index 4a76022..381fd2d 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.kylin.engine.spark.common.logging;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -111,7 +129,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
 
     @Override
     String getAppenderName() {
-        return "SparkExecutorHdfsLogAppender";
+        return "SparkExecutorHdfsAppender";
     }
 
     @Override
@@ -164,6 +182,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
             String rollingDir = dateFormat.format(new Date(event.getTimeStamp()));
             outPutPath = getOutPutDir(rollingDir);
         }
+        LogLog.warn("Update to " + outPutPath);
     }
 
     private String getOutPutDir(String rollingDir) {
@@ -236,8 +255,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
     }
 
     private String parseHdfsWordingDir() {
-        return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/")
-                + StringUtils.replace(getMetadataIdentifier(), "/", "-");
+        return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/");
     }
 }
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 84737cb..4ec461a 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -135,8 +135,13 @@ public abstract class SparkApplication {
                 }
                 if (!config.getSparkConfigOverride().isEmpty()) {
                     for (Map.Entry<String, String> entry : config.getSparkConfigOverride().entrySet()) {
-                        logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue());
-                        sparkConf.set(entry.getKey(), entry.getValue());
+                        if (entry.getKey().contains("spark.executor.extraJavaOptions")) {
+                            // Just let NSparkExecutable#replaceSparkNodeJavaOpsConfIfNeeded(in JobServer) to determine executor's JVM level configuration
+                            logger.info("Do not override {}={}.", entry.getKey(), entry.getValue());
+                        } else {
+                            logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue());
+                            sparkConf.set(entry.getKey(), entry.getValue());
+                        }
                     }
                 }
             } else if (!isJobOnCluster(sparkConf)) {
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 3cc1b60..987c215 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -122,8 +122,8 @@ public class NSparkExecutable extends AbstractExecutable {
         String hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
         logger.info("write hadoop conf is {} ", config.getBuildConf());
         if (!config.getBuildConf().isEmpty()) {
-               logger.info("write hadoop conf is {} ", config.getBuildConf());
-               hadoopConf = config.getBuildConf();
+            logger.info("write hadoop conf is {} ", config.getBuildConf());
+            hadoopConf = config.getBuildConf();
         }
         if (StringUtils.isEmpty(hadoopConf) && !config.isUTEnv() && !config.isZKLocal()) {
             throw new RuntimeException(
@@ -203,7 +203,7 @@ public class NSparkExecutable extends AbstractExecutable {
         String project = getParam(MetadataConstants.P_PROJECT_NAME);
         Preconditions.checkState(StringUtils.isNotBlank(project), "job " + getId() + " project info is empty");
 
-        HashMap<String, String> jobOverrides = new HashMap();
+        HashMap<String, String> jobOverrides = new HashMap<>();
         String parentId = getParentId();
         jobOverrides.put("job.id", StringUtils.defaultIfBlank(parentId, getId()));
         jobOverrides.put("job.project", project);
@@ -250,7 +250,7 @@ public class NSparkExecutable extends AbstractExecutable {
     }
 
     private ExecuteResult runSparkSubmit(KylinConfig config, String hadoopConf, String jars,
-            String kylinJobJar, String appArgs, String jobId) {
+                                         String kylinJobJar, String appArgs, String jobId) {
         PatternedLogger patternedLogger;
         if (config.isJobLogPrintEnabled()) {
             patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
@@ -302,16 +302,21 @@ public class NSparkExecutable extends AbstractExecutable {
             sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
         }
 
-        replaceSparkDriverJavaOpsConfIfNeeded(config, sparkConfigOverride);
+        replaceSparkNodeJavaOpsConfIfNeeded(true, config, sparkConfigOverride);
+        replaceSparkNodeJavaOpsConfIfNeeded(false, config, sparkConfigOverride);
         return sparkConfigOverride;
     }
 
-    private void replaceSparkDriverJavaOpsConfIfNeeded(KylinConfig config, Map<String, String> sparkConfigOverride) {
-        String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions";
+    /**
+     * Add property in spark.xxx.extraJavaOptions for AbstractHdfsLogAppender
+     * Please check following for detail.
+     * 1. conf/spark-driver-log4j.properties and conf/spark-executor-log4j.properties
+     * 2. AbstractHdfsLogAppender
+     */
+    private void replaceSparkNodeJavaOpsConfIfNeeded(boolean isDriver, KylinConfig config,
+                                                     Map<String, String> sparkConfigOverride) {
+        String sparkNodeExtraJavaOptionsKey = isDriver ? "spark.driver.extraJavaOptions" : "spark.executor.extraJavaOptions";
         StringBuilder sb = new StringBuilder();
-        if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) {
-            sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey));
-        }
 
         String serverIp = "127.0.0.1";
         try {
@@ -320,31 +325,37 @@ public class NSparkExecutable extends AbstractExecutable {
             logger.warn("use the InetAddress get local ip failed!", e);
         }
         String serverPort = config.getServerPort();
-
         String hdfsWorkingDir = config.getHdfsWorkingDirectory();
-
-        String sparkDriverHdfsLogPath = null;
-        if (config instanceof KylinConfigExt) {
-            Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
-            if (Objects.nonNull(extendedOverrides)) {
-                sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
+        String log4jConfiguration;
+
+        if (isDriver) {
+            String sparkDriverHdfsLogPath;
+            if (config instanceof KylinConfigExt) {
+                Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
+                if (Objects.nonNull(extendedOverrides)) {
+                    sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
+                    sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
+                }
             }
-        }
-
-        /*
-        if (config.isCloud()) {
-            String logLocalWorkingDirectory = config.getLogLocalWorkingDirectory();
-            if (StringUtils.isNotBlank(logLocalWorkingDirectory)) {
-                hdfsWorkingDir = logLocalWorkingDirectory;
-                sparkDriverHdfsLogPath = logLocalWorkingDirectory + sparkDriverHdfsLogPath;
+            log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile();
+            sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration));
+            sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp));
+            sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort));
+            sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
+            sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
+        } else {
+            if (config instanceof KylinConfigExt) {
+                Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
+                if (Objects.nonNull(extendedOverrides)) {
+                    sb.append(String.format(Locale.ROOT, " -Dkylin.spark.project=%s ", extendedOverrides.get("job.project")));
+                    sb.append(String.format(Locale.ROOT, " -Dkylin.spark.jobName=%s ", extendedOverrides.get("job.stepId")));
+                    sb.append(String.format(Locale.ROOT, " -Dkylin.spark.identifier=%s ", extendedOverrides.get("job.id")));
+                }
             }
+            sb.append(String.format(Locale.ROOT, " -Dkylin.metadata.identifier=%s ", config.getMetadataUrlPrefix()));
         }
-         */
 
-        String log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile();
-        sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration));
         sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", config.isKerberosEnabled()));
-
         if (config.isKerberosEnabled()) {
             sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.principal=%s ", config.getKerberosPrincipal()));
             sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.keytab=%s", config.getKerberosKeytabPath()));
@@ -354,16 +365,17 @@ public class NSparkExecutable extends AbstractExecutable {
             }
         }
         sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDir));
-        sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
-        sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp));
-        sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort));
-        sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
-        sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
-        sparkConfigOverride.put(sparkDriverExtraJavaOptionsKey, sb.toString());
+
+        if (sparkConfigOverride.containsKey(sparkNodeExtraJavaOptionsKey)) {
+            sb.append(" ").append(sparkConfigOverride.get(sparkNodeExtraJavaOptionsKey));
+        }
+        logger.debug("Final property is set to <<{}>> for {} .", sb.toString(), sparkNodeExtraJavaOptionsKey);
+        // Here replace original Java options for Spark node
+        sparkConfigOverride.put(sparkNodeExtraJavaOptionsKey, sb.toString());
     }
 
     protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar,
-            String appArgs) {
+                                      String appArgs) {
         StringBuilder sb = new StringBuilder();
         sb.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.engine.spark.application.SparkEntry ");
 
@@ -377,11 +389,12 @@ public class NSparkExecutable extends AbstractExecutable {
         if (sparkConfs.containsKey("spark.sql.hive.metastore.jars")) {
             jars = jars + "," + sparkConfs.get("spark.sql.hive.metastore.jars");
         }
-
+        sb.append("--files ").append(config.sparkUploadFiles()).append(" ");
         sb.append("--name job_step_%s ");
         sb.append("--jars %s %s %s");
         String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, KylinConfig.getSparkHome(), getId(), jars, kylinJobJar,
                 appArgs);
+        // SparkConf still have a change to be changed in CubeBuildJob.java (Spark Driver)
         logger.info("spark submit cmd: {}", cmd);
         return cmd;
     }
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
index fb09d38..7ae0937 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
@@ -174,10 +174,10 @@ object KylinSession extends Logging {
         if (sparkConf.get("spark.master").startsWith("yarn")) {
           sparkConf.set("spark.yarn.dist.jars",
             KylinConfig.getInstanceFromEnv.getKylinParquetJobJarPath)
-          sparkConf.set("spark.yarn.dist.files", conf.sparderFiles())
+          sparkConf.set("spark.yarn.dist.files", conf.sparkUploadFiles())
         } else {
           sparkConf.set("spark.jars", conf.sparderJars)
-          sparkConf.set("spark.files", conf.sparderFiles())
+          sparkConf.set("spark.files", conf.sparkUploadFiles())
         }
 
         val fileName = KylinConfig.getInstanceFromEnv.getKylinParquetJobJarPath
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index c28362c..cd63ac7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -20,12 +20,14 @@ package org.apache.kylin.rest.controller;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Locale;
 
+import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
@@ -164,29 +166,20 @@ public class JobController extends BasicController {
 
     /**
      * Get a job step output
-     * 
-     * @return
-     * @throws IOException
      */
-
     @RequestMapping(value = "/{jobId}/steps/{stepId}/output", method = { RequestMethod.GET }, produces = {
             "application/json" })
     @ResponseBody
     public Map<String, String> getStepOutput(@PathVariable String jobId, @PathVariable String stepId) {
-        Map<String, String> result = new HashMap<String, String>();
+        Map<String, String> result = new HashMap<>();
         result.put("jobId", jobId);
         result.put("stepId", String.valueOf(stepId));
-        result.put("cmd_output", jobService.getJobOutput(jobId, stepId));
+        result.put("cmd_output", jobService.getJobStepOutput(jobId, stepId));
         return result;
     }
 
     /**
-     * Download a job step output from hdfs
-     * @param jobId
-     * @param stepId
-     * @param project
-     * @param response
-     * @return
+     * Download a step output(Spark driver log) from hdfs
      */
     @RequestMapping(value = "/{job_id:.+}/steps/{step_id:.+}/log", method = { RequestMethod.GET }, produces = { "application/json" })
     @ResponseBody
@@ -196,10 +189,12 @@ public class JobController extends BasicController {
         checkRequiredArg("job_id", jobId);
         checkRequiredArg("step_id", stepId);
         checkRequiredArg("project", project);
-        String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", project, stepId);
+        String validatedPrj =  CliCommandExecutor.checkParameter(project);
+        String validatedStepId =  CliCommandExecutor.checkParameter(stepId);
+        String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", validatedPrj, validatedStepId);
 
-        String jobOutput = jobService.getAllJobOutput(jobId, stepId);
-        setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes("UTF-8")), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response);
+        String jobOutput = jobService.getAllJobStepOutput(jobId, stepId);
+        setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes(StandardCharsets.UTF_8)), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response);
         return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", "");
     }
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 2c0c20c..90ee782 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -480,12 +480,12 @@ public class JobService extends BasicService implements InitializingBean {
         return getExecutableManager().getOutput(id);
     }
 
-    public String getJobOutput(String jobId, String stepId) {
+    public String getJobStepOutput(String jobId, String stepId) {
         ExecutableManager executableManager = getExecutableManager();
         return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg();
     }
 
-    public String getAllJobOutput(String jobId, String stepId) {
+    public String getAllJobStepOutput(String jobId, String stepId) {
         ExecutableManager executableManager = getExecutableManager();
         return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg();
     }