You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/11/27 07:22:47 UTC

[kylin] branch kylin-on-parquet-v2 updated (c7e08d6 -> 51ff5ec)

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a change to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from c7e08d6  KYLIN-4801 Add IT sql
     new 2dca9ef  KYLIN-4813 Refactor spark build log
     new c0b2787  KYLIN-4813 Add spark executor log4j
     new 86aae4d  KYLIN-4813 Fix several bugs for executor-side log collection
     new c4d3168  KYLIN-4813 Add download all log link in front-end
     new 9a4df78  KYLIN-4813 Some adjustments for executor-side log collection
     new 51ff5ec  KYLIN-4825 Add yarn tracking url on job step page

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../engine/mr/common/MapReduceExecutable.java      |   7 +-
 build/bin/kylin-port-replace-util.sh               |   1 +
 build/conf/kylin-parquet-log4j.properties          |  33 --
 build/conf/kylin-spark-log4j.properties            |  43 ---
 build/conf/spark-driver-log4j.properties           |  45 +++
 build/conf/spark-executor-log4j.properties         |  46 +++
 .../org/apache/kylin/common/KylinConfigBase.java   |  58 +++-
 .../org/apache/kylin/common/KylinConfigExt.java    |  18 +-
 .../common/logging/SensitivePatternLayout.java     |  62 ++++
 .../src/main/resources/kylin-defaults.properties   |   7 +-
 .../apache/kylin/job/dao/ExecutableOutputPO.java   |  11 +
 .../kylin/job/execution/AbstractExecutable.java    |  17 +-
 .../job/execution/DefaultChainedExecutable.java    |  18 +-
 .../kylin/job/execution/ExecutableManager.java     | 274 +++++++++++++--
 .../apache/kylin/job/ExecutableManagerTest.java    |  20 +-
 .../job/impl/threadpool/DefaultSchedulerTest.java  |   4 +-
 .../common/logging/AbstractHdfsLogAppender.java    | 377 +++++++++++++++++++++
 .../common/logging/SparkDriverHdfsLogAppender.java | 112 ++++++
 .../common/logging/SparkExecutorHdfsAppender.java  | 261 ++++++++++++++
 .../engine/spark/application/SparkApplication.java | 106 ++++++
 .../kylin/engine/spark/job/NSparkExecutable.java   | 108 +++---
 .../kylin/engine/spark/utils/MetaDumpUtil.java     |   3 +-
 .../scala/org/apache/spark/sql/KylinSession.scala  |   4 +-
 .../kylin/rest/controller/BasicController.java     |  16 +
 .../kylin/rest/controller/JobController.java       |  53 ++-
 ...leRequestV2.java => SparkJobUpdateRequest.java} |  45 ++-
 .../org/apache/kylin/rest/service/JobService.java  |  24 ++
 server/src/main/resources/kylinSecurity.xml        |   1 +
 webapp/app/js/controllers/job.js                   |  10 +-
 webapp/app/partials/jobs/job_steps.html            |   1 +
 30 files changed, 1566 insertions(+), 219 deletions(-)
 delete mode 100644 build/conf/kylin-parquet-log4j.properties
 delete mode 100644 build/conf/kylin-spark-log4j.properties
 create mode 100644 build/conf/spark-driver-log4j.properties
 create mode 100644 build/conf/spark-executor-log4j.properties
 create mode 100644 core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java
 create mode 100644 kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
 create mode 100644 kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java
 create mode 100644 kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
 copy server-base/src/main/java/org/apache/kylin/rest/request/{HiveTableRequestV2.java => SparkJobUpdateRequest.java} (68%)


[kylin] 05/06: KYLIN-4813 Some adjustments for executor-side log collection

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 9a4df78f291cd7e34a30cbbdd9ec43124e6cd46f
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Tue Nov 24 14:12:06 2020 +0800

    KYLIN-4813 Some adjustments for executor-side log collection
---
 .../org/apache/kylin/common/KylinConfigBase.java   | 18 ++++---
 .../org/apache/kylin/common/KylinConfigExt.java    | 18 +++++--
 .../src/main/resources/kylin-defaults.properties   |  2 +-
 .../common/logging/SparkExecutorHdfsAppender.java  |  2 +-
 .../engine/spark/application/SparkApplication.java |  9 +---
 .../kylin/engine/spark/job/NSparkExecutable.java   | 61 ++++++++--------------
 .../kylin/engine/spark/utils/MetaDumpUtil.java     |  3 +-
 7 files changed, 53 insertions(+), 60 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 01dc374..964eb64 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -181,8 +181,8 @@ public abstract class KylinConfigBase implements Serializable {
     protected String getOptional(String prop, String dft) {
 
         final String property = System.getProperty(prop);
-        return property != null ? StrSubstitutor.replace(property, System.getenv())
-                : StrSubstitutor.replace(properties.getProperty(prop, dft), System.getenv());
+        return property != null ? getSubstitutor().replace(property, System.getenv())
+                : getSubstitutor().replace(properties.getProperty(prop, dft), System.getenv());
     }
 
     protected Properties getAllProperties() {
@@ -194,8 +194,7 @@ public abstract class KylinConfigBase implements Serializable {
      * @return properties which contained in propertyKeys
      */
     protected Properties getProperties(Collection<String> propertyKeys) {
-        Map<String, String> envMap = System.getenv();
-        StrSubstitutor sub = new StrSubstitutor(envMap);
+        final StrSubstitutor sub = getSubstitutor();
 
         Properties filteredProperties = new Properties();
         for (Entry<Object, Object> entry : this.properties.entrySet()) {
@@ -206,9 +205,17 @@ public abstract class KylinConfigBase implements Serializable {
         return filteredProperties;
     }
 
+    protected StrSubstitutor getSubstitutor() {
+        // env > properties
+        final Map<String, Object> all = Maps.newHashMap();
+        all.putAll((Map) properties);
+        all.putAll(System.getenv());
+
+        return new StrSubstitutor(all);
+    }
+
     protected Properties getRawAllProperties() {
         return properties;
-
     }
 
     protected final Map<String, String> getPropertiesByPrefix(String prefix) {
@@ -259,7 +266,6 @@ public abstract class KylinConfigBase implements Serializable {
     final protected void reloadKylinConfig(Properties properties) {
         this.properties = BCC.check(properties);
         setProperty("kylin.metadata.url.identifier", getMetadataUrlPrefix());
-        setProperty("kylin.log.spark-driver-properties-file", getLogSparkDriverPropertiesFile());
         setProperty("kylin.log.spark-executor-properties-file", getLogSparkExecutorPropertiesFile());
     }
 
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
index ddec154..fda4100 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
@@ -18,11 +18,12 @@
 
 package org.apache.kylin.common;
 
+import org.apache.commons.lang.text.StrSubstitutor;
+import org.apache.kylin.shaded.com.google.common.collect.Maps;
+
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.commons.lang3.text.StrSubstitutor;
-
 /**
  * Extends a KylinConfig with additional overrides.
  */
@@ -59,7 +60,7 @@ public class KylinConfigExt extends KylinConfig {
     public String getOptional(String prop, String dft) {
         String value = overrides.get(prop);
         if (value != null)
-            return   StrSubstitutor.replace(value, System.getenv());
+            return   getSubstitutor().replace(value, System.getenv());
         else
             return super.getOptional(prop, dft);
     }
@@ -67,11 +68,20 @@ public class KylinConfigExt extends KylinConfig {
     @Override
     protected Properties getAllProperties() {
         Properties result = new Properties();
-        result.putAll(super.getRawAllProperties());
+        result.putAll(super.getAllProperties());
         result.putAll(overrides);
         return result;
     }
 
+    @Override
+    protected StrSubstitutor getSubstitutor() {
+        final Map<String, Object> all = Maps.newHashMap();
+        all.putAll((Map) properties);
+        all.putAll(System.getenv());
+        all.putAll(overrides);
+        return new StrSubstitutor(all);
+    }
+
     public Map<String, String> getExtendedOverrides() {
         return overrides;
     }
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 858278f..c5cd317 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -259,7 +259,7 @@ kylin.engine.spark-conf.spark.eventLog.enabled=true
 kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
-kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.spark.category=job
+kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${hdfs.working.dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=job -Dkylin.spark.project=${job.project} -Dkylin.spark.identifier=${job.id} -Dkylin.spark.jobName=${job.stepId} -Duser.timezone=${user.timezone}
 #kylin.engine.spark-conf.spark.sql.shuffle.partitions=1
 
 # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
index d8b250f..b2e4146 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -229,7 +229,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
     @VisibleForTesting
     String getRootPathName() {
         if ("job".equals(getCategory())) {
-            return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs";
+            return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs/executor/";
         } else if ("sparder".equals(getCategory())) {
             return parseHdfsWordingDir() + "/_sparder_logs";
         } else {
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 4ec461a..84737cb 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -135,13 +135,8 @@ public abstract class SparkApplication {
                 }
                 if (!config.getSparkConfigOverride().isEmpty()) {
                     for (Map.Entry<String, String> entry : config.getSparkConfigOverride().entrySet()) {
-                        if (entry.getKey().contains("spark.executor.extraJavaOptions")) {
-                            // Just let NSparkExecutable#replaceSparkNodeJavaOpsConfIfNeeded(in JobServer) to determine executor's JVM level configuration
-                            logger.info("Do not override {}={}.", entry.getKey(), entry.getValue());
-                        } else {
-                            logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue());
-                            sparkConf.set(entry.getKey(), entry.getValue());
-                        }
+                        logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue());
+                        sparkConf.set(entry.getKey(), entry.getValue());
                     }
                 }
             } else if (!isJobOnCluster(sparkConf)) {
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 987c215..7cd178c 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -163,7 +163,7 @@ public class NSparkExecutable extends AbstractExecutable {
     void attachMetadataAndKylinProps(KylinConfig config) throws IOException {
         // The way of Updating metadata is CopyOnWrite. So it is safe to use Reference in the value.
         Set<String> dumpList = getMetadataDumpList(config);
-        MetaDumpUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, KylinConfigExt.createInstance(config, new HashMap<>()), getDistMetaUrl());
+        MetaDumpUtil.dumpAndUploadKylinPropsAndMetadata(dumpList, config, getDistMetaUrl());
     }
 
     String dumpArgs() throws ExecuteException {
@@ -211,6 +211,7 @@ public class NSparkExecutable extends AbstractExecutable {
             jobOverrides.put("job.stepId", getId());
         }
         jobOverrides.put("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone());
+        jobOverrides.put("hdfs.working.dir", KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
         jobOverrides.put("spark.driver.log4j.appender.hdfs.File",
                 Objects.isNull(this.getLogPath()) ? "null" : this.getLogPath());
 
@@ -302,8 +303,7 @@ public class NSparkExecutable extends AbstractExecutable {
             sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
         }
 
-        replaceSparkNodeJavaOpsConfIfNeeded(true, config, sparkConfigOverride);
-        replaceSparkNodeJavaOpsConfIfNeeded(false, config, sparkConfigOverride);
+        replaceSparkNodeJavaOpsConfIfNeeded(config, sparkConfigOverride);
         return sparkConfigOverride;
     }
 
@@ -313,11 +313,13 @@ public class NSparkExecutable extends AbstractExecutable {
      * 1. conf/spark-driver-log4j.properties and conf/spark-executor-log4j.properties
      * 2. AbstractHdfsLogAppender
      */
-    private void replaceSparkNodeJavaOpsConfIfNeeded(boolean isDriver, KylinConfig config,
+    private void replaceSparkNodeJavaOpsConfIfNeeded(KylinConfig config,
                                                      Map<String, String> sparkConfigOverride) {
-        String sparkNodeExtraJavaOptionsKey = isDriver ? "spark.driver.extraJavaOptions" : "spark.executor.extraJavaOptions";
+        String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions";
         StringBuilder sb = new StringBuilder();
-
+        if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) {
+            sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey));
+        }
         String serverIp = "127.0.0.1";
         try {
             serverIp = InetAddress.getLocalHost().getHostAddress();
@@ -326,35 +328,17 @@ public class NSparkExecutable extends AbstractExecutable {
         }
         String serverPort = config.getServerPort();
         String hdfsWorkingDir = config.getHdfsWorkingDirectory();
-        String log4jConfiguration;
-
-        if (isDriver) {
-            String sparkDriverHdfsLogPath;
-            if (config instanceof KylinConfigExt) {
-                Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
-                if (Objects.nonNull(extendedOverrides)) {
-                    sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
-                    sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
-                }
-            }
-            log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile();
-            sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration));
-            sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp));
-            sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort));
-            sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
-            sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
-        } else {
-            if (config instanceof KylinConfigExt) {
-                Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
-                if (Objects.nonNull(extendedOverrides)) {
-                    sb.append(String.format(Locale.ROOT, " -Dkylin.spark.project=%s ", extendedOverrides.get("job.project")));
-                    sb.append(String.format(Locale.ROOT, " -Dkylin.spark.jobName=%s ", extendedOverrides.get("job.stepId")));
-                    sb.append(String.format(Locale.ROOT, " -Dkylin.spark.identifier=%s ", extendedOverrides.get("job.id")));
-                }
+
+        String sparkDriverHdfsLogPath = null;
+        if (config instanceof KylinConfigExt) {
+            Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
+            if (Objects.nonNull(extendedOverrides)) {
+                sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
             }
-            sb.append(String.format(Locale.ROOT, " -Dkylin.metadata.identifier=%s ", config.getMetadataUrlPrefix()));
         }
 
+        String log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile();
+        sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration));
         sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", config.isKerberosEnabled()));
         if (config.isKerberosEnabled()) {
             sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.principal=%s ", config.getKerberosPrincipal()));
@@ -365,13 +349,12 @@ public class NSparkExecutable extends AbstractExecutable {
             }
         }
         sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDir));
-
-        if (sparkConfigOverride.containsKey(sparkNodeExtraJavaOptionsKey)) {
-            sb.append(" ").append(sparkConfigOverride.get(sparkNodeExtraJavaOptionsKey));
-        }
-        logger.debug("Final property is set to <<{}>> for {} .", sb.toString(), sparkNodeExtraJavaOptionsKey);
-        // Here replace original Java options for Spark node
-        sparkConfigOverride.put(sparkNodeExtraJavaOptionsKey, sb.toString());
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp));
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort));
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
+        sparkConfigOverride.put(sparkDriverExtraJavaOptionsKey, sb.toString());
     }
 
     protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar,
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
index 32ca143..c2ef2c9 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/utils/MetaDumpUtil.java
@@ -29,7 +29,6 @@ import java.util.Set;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.persistence.AutoDeleteDirectory;
 import org.apache.kylin.common.persistence.RawResource;
@@ -64,7 +63,7 @@ public class MetaDumpUtil {
         return dumpList;
     }
 
-    public static void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfigExt kylinConfig,
+    public static void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, KylinConfig kylinConfig,
             String metadataUrl) throws IOException {
 
         try (AutoDeleteDirectory tmpDir = new AutoDeleteDirectory("kylin_job_meta", "");


[kylin] 02/06: KYLIN-4813 Add spark executor log4j

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c0b27873f7ff6cfcaa4d404214c41288bd81a4f4
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Thu Nov 19 10:24:18 2020 +0800

    KYLIN-4813 Add spark executor log4j
    
    KYLIN-4813 Minor fix
---
 build/conf/spark-executor-log4j.properties         |  46 ++++
 .../org/apache/kylin/common/KylinConfigBase.java   |   6 +
 .../src/main/resources/kylin-defaults.properties   |   2 +
 .../kylin/job/execution/AbstractExecutable.java    |   2 +-
 .../kylin/job/execution/ExecutableManager.java     |   1 -
 .../common/logging/SparkExecutorHdfsAppender.java  | 243 +++++++++++++++++++++
 6 files changed, 298 insertions(+), 2 deletions(-)

diff --git a/build/conf/spark-executor-log4j.properties b/build/conf/spark-executor-log4j.properties
new file mode 100644
index 0000000..7cc5b04
--- /dev/null
+++ b/build/conf/spark-executor-log4j.properties
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# It's called spark-executor-log4j.properties so that it won't distract users from the other more important log4j config file: kylin-server-log4j.properties
+# enable this by -Dlog4j.configuration=spark-executor-log4j.properties
+log4j.rootLogger=INFO,stderr,hdfs
+
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.layout=org.apache.kylin.common.logging.SensitivePatternLayout
+log4j.appender.stderr.target=System.err
+#Don't add line number (%L) as it's too costly!
+log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
+
+
+log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsLogAppender
+
+log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir}
+log4j.appender.hdfs.metadataIdentifier=${kylin.metadata.identifier}
+log4j.appender.hdfs.category=${kylin.spark.category}
+log4j.appender.hdfs.identifier=${kylin.spark.identifier}
+log4j.appender.hdfs.jobName=${kylin.spark.jobName}
+log4j.appender.hdfs.project=${kylin.spark.project}
+
+log4j.appender.hdfs.rollingPeriod=5
+log4j.appender.hdfs.logQueueCapacity=5000
+#flushPeriod count as millis
+log4j.appender.hdfs.flushInterval=5000
+
+log4j.appender.hdfs.layout=org.apache.kylin.engine.spark.common.logging.SensitivePatternLayout
+#Don't add line number (%L) as it's too costly!
+log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 081d23f..e2727fa 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -260,6 +260,7 @@ public abstract class KylinConfigBase implements Serializable {
         this.properties = BCC.check(properties);
         setProperty("kylin.metadata.url.identifier", getMetadataUrlPrefix());
         setProperty("kylin.log.spark-driver-properties-file", getLogSparkDriverPropertiesFile());
+        setProperty("kylin.log.spark-executor-properties-file", getLogSparkExecutorPropertiesFile());
     }
 
     private Map<Integer, String> convertKeyToInteger(Map<String, String> map) {
@@ -2829,6 +2830,11 @@ public abstract class KylinConfigBase implements Serializable {
         return getLogPropertyFile("spark-driver-log4j.properties");
     }
 
+    public String getLogSparkExecutorPropertiesFile() {
+        return getLogPropertyFile("spark-executor-log4j.properties");
+    }
+
+
     private String getLogPropertyFile(String filename) {
         if (isDevEnv()) {
             return Paths.get(getKylinHomeWithoutWarn(), "build", "conf").toString() + File.separator + filename;
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 3d8268f..8fba3bc 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -259,6 +259,7 @@ kylin.engine.spark-conf.spark.eventLog.enabled=true
 kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
+kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=job -Dkylin.spark.project=${job.project} -Dkylin.spark.identifier=${job.id} -Dkylin.spark.jobName=${job.stepId} -Duser.timezone=${user.timezone}
 #kylin.engine.spark-conf.spark.sql.shuffle.partitions=1
 
 # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
@@ -286,6 +287,7 @@ kylin.query.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializ
 #kylin.query.spark-conf.spark.sql.shuffle.partitions=40
 #kylin.query.spark-conf.spark.yarn.jars=hdfs://localhost:9000/spark2_jars/*
 
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project} -XX:MaxDirectMemorySize=896M
 # uncomment for HDP
 #kylin.query.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.query.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index d815e5b..a2fe7e5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -596,7 +596,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
 
     public final String getProject() {
         if (project == null) {
-            logger.error("project is not set for abstract executable " + getId());
+            throw new IllegalStateException("project is not set for abstract executable " + getId());
         }
         return project;
     }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index c9ae8a4..590276d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -382,7 +382,6 @@ public class ExecutableManager {
 
     /**
      * get the last N_LINES lines from the end of hdfs file input stream;
-     * reference: https://olapio.atlassian.net/wiki/spaces/PD/pages/1306918958
      *
      * @param hdfsDin
      * @param startPos
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
new file mode 100644
index 0000000..4a76022
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -0,0 +1,243 @@
+package org.apache.kylin.engine.spark.common.logging;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+import java.util.UUID;
+
+public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
+
+    private static final long A_DAY_MILLIS = 24 * 60 * 60 * 1000L;
+    private static final long A_HOUR_MILLIS = 60 * 60 * 1000L;
+    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.getDefault());
+    private SimpleDateFormat hourFormat = new SimpleDateFormat("HH");
+
+    @VisibleForTesting
+    String outPutPath;
+    @VisibleForTesting
+    String executorId;
+
+    @VisibleForTesting
+    long startTime = 0;
+    @VisibleForTesting
+    boolean rollingByHour = false;
+    @VisibleForTesting
+    int rollingPeriod = 5;
+
+    //log appender configurable
+    private String metadataIdentifier;
+    private String category;
+
+    private String identifier;
+
+    // only cubing job
+    private String jobName;
+    private String project;
+
+    public String getProject() {
+        return project;
+    }
+
+    public void setProject(String project) {
+        this.project = project;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public void setIdentifier(String identifier) {
+        this.identifier = identifier;
+    }
+
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    public void setCategory(String category) {
+        this.category = category;
+    }
+
+    public String getCategory() {
+        return category;
+    }
+
+    public void setMetadataIdentifier(String metadataIdentifier) {
+        this.metadataIdentifier = metadataIdentifier;
+    }
+
+    public String getMetadataIdentifier() {
+        return metadataIdentifier;
+    }
+
+    @Override
+    void init() {
+        if (StringUtils.isBlank(this.identifier)) {
+            this.identifier = YarnSparkHadoopUtil.getContainerId().getApplicationAttemptId().getApplicationId()
+                    .toString();
+        }
+
+        LogLog.warn("metadataIdentifier -> " + getMetadataIdentifier());
+        LogLog.warn("category -> " + getCategory());
+        LogLog.warn("identifier -> " + getIdentifier());
+
+        if (null != getProject()) {
+            LogLog.warn("project -> " + getProject());
+        }
+
+        if (null != getJobName()) {
+            LogLog.warn("jobName -> " + getJobName());
+        }
+    }
+
+    @Override
+    String getAppenderName() {
+        return "SparkExecutorHdfsLogAppender";
+    }
+
+    @Override
+    boolean isSkipCheckAndFlushLog() {
+        if (SparkEnv.get() == null && StringUtils.isBlank(executorId)) {
+            LogLog.warn("Waiting for spark executor to start");
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                LogLog.error("Waiting for spark executor starting is interrupted!", e);
+                Thread.currentThread().interrupt();
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    void doWriteLog(int size, List<LoggingEvent> transaction) throws IOException, InterruptedException {
+        while (size > 0) {
+            final LoggingEvent loggingEvent = getLogBufferQue().take();
+            if (isTimeChanged(loggingEvent)) {
+                updateOutPutDir(loggingEvent);
+
+                final Path file = new Path(outPutPath);
+
+                String sparkuser = System.getenv("SPARK_USER");
+                String user = System.getenv("USER");
+                LogLog.warn("login user is " + UserGroupInformation.getLoginUser() + " SPARK_USER is " + sparkuser
+                        + " USER is " + user);
+                if (!initHdfsWriter(file, new Configuration())) {
+                    LogLog.error("Failed to init the hdfs writer!");
+                }
+                doRollingClean(loggingEvent);
+            }
+
+            transaction.add(loggingEvent);
+            writeLogEvent(loggingEvent);
+            size--;
+        }
+    }
+
+    @VisibleForTesting
+    void updateOutPutDir(LoggingEvent event) {
+        if (rollingByHour) {
+            String rollingDir = dateFormat.format(new Date(event.getTimeStamp())) + "/"
+                    + hourFormat.format(new Date(event.getTimeStamp()));
+            outPutPath = getOutPutDir(rollingDir);
+        } else {
+            String rollingDir = dateFormat.format(new Date(event.getTimeStamp()));
+            outPutPath = getOutPutDir(rollingDir);
+        }
+    }
+
+    private String getOutPutDir(String rollingDir) {
+        if (StringUtils.isBlank(executorId)) {
+            executorId = SparkEnv.get() != null ? SparkEnv.get().executorId() : UUID.randomUUID().toString();
+            LogLog.warn("executorId set to " + executorId);
+        }
+
+        if ("job".equals(getCategory())) {
+            return getRootPathName() + "/" + rollingDir + "/" + getIdentifier() + "/" + getJobName() + "/" + "executor-"
+                    + executorId + ".log";
+        }
+        return getRootPathName() + "/" + rollingDir + "/" + getIdentifier() + "/" + "executor-" + executorId + ".log";
+    }
+
+    @VisibleForTesting
+    void doRollingClean(LoggingEvent event) throws IOException {
+        FileSystem fileSystem = getFileSystem();
+
+        String rootPathName = getRootPathName();
+        Path rootPath = new Path(rootPathName);
+
+        if (!fileSystem.exists(rootPath))
+            return;
+
+        FileStatus[] logFolders = fileSystem.listStatus(rootPath);
+
+        if (logFolders == null)
+            return;
+
+        String thresholdDay = dateFormat.format(new Date(event.getTimeStamp() - A_DAY_MILLIS * rollingPeriod));
+
+        for (FileStatus fs : logFolders) {
+            String fileName = fs.getPath().getName();
+            if (fileName.compareTo(thresholdDay) < 0) {
+                Path fullPath = new Path(rootPathName + File.separator + fileName);
+                if (!fileSystem.exists(fullPath))
+                    continue;
+                fileSystem.delete(fullPath, true);
+            }
+        }
+    }
+
+    @VisibleForTesting
+    String getRootPathName() {
+        if ("job".equals(getCategory())) {
+            return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs";
+        } else if ("sparder".equals(getCategory())) {
+            return parseHdfsWordingDir() + "/_sparder_logs";
+        } else {
+            throw new IllegalArgumentException("illegal category: " + getCategory());
+        }
+    }
+
+    @VisibleForTesting
+    boolean isTimeChanged(LoggingEvent event) {
+        if (rollingByHour) {
+            return isNeedRolling(event, A_HOUR_MILLIS);
+        } else {
+            return isNeedRolling(event, A_DAY_MILLIS);
+        }
+    }
+
+    private boolean isNeedRolling(LoggingEvent event, Long timeInterval) {
+        if (0 == startTime || ((event.getTimeStamp() / timeInterval) - (startTime / timeInterval)) > 0) {
+            startTime = event.getTimeStamp();
+            return true;
+        }
+        return false;
+    }
+
+    private String parseHdfsWordingDir() {
+        return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/")
+                + StringUtils.replace(getMetadataIdentifier(), "/", "-");
+    }
+}
+


[kylin] 03/06: KYLIN-4813 Fix several bugs for executor-side log collection

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 86aae4dc1cd6f364c36e9f47db84b20956b2b911
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Sun Nov 22 22:04:30 2020 +0800

    KYLIN-4813 Fix several bugs for executor-side log collection
    
    - add --files to upload user-defined log4j properties
    - fix ClassName error
    - fix executor.extraJavaOptions be overwrote in Driver side
    - fix security issue (HTTP response splitting)
    - code style etc.
---
 build/conf/spark-executor-log4j.properties         |  4 +-
 .../org/apache/kylin/common/KylinConfigBase.java   |  6 +-
 .../src/main/resources/kylin-defaults.properties   |  6 +-
 .../kylin/job/execution/ExecutableManager.java     | 26 +++----
 .../common/logging/SparkExecutorHdfsAppender.java  | 24 +++++-
 .../engine/spark/application/SparkApplication.java |  9 ++-
 .../kylin/engine/spark/job/NSparkExecutable.java   | 87 +++++++++++++---------
 .../scala/org/apache/spark/sql/KylinSession.scala  |  4 +-
 .../kylin/rest/controller/JobController.java       | 25 +++----
 .../org/apache/kylin/rest/service/JobService.java  |  4 +-
 10 files changed, 110 insertions(+), 85 deletions(-)

diff --git a/build/conf/spark-executor-log4j.properties b/build/conf/spark-executor-log4j.properties
index 7cc5b04..fb5b7e3 100644
--- a/build/conf/spark-executor-log4j.properties
+++ b/build/conf/spark-executor-log4j.properties
@@ -27,7 +27,7 @@ log4j.appender.stderr.target=System.err
 log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
 
 
-log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsLogAppender
+log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsAppender
 
 log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir}
 log4j.appender.hdfs.metadataIdentifier=${kylin.metadata.identifier}
@@ -41,6 +41,6 @@ log4j.appender.hdfs.logQueueCapacity=5000
 #flushPeriod count as millis
 log4j.appender.hdfs.flushInterval=5000
 
-log4j.appender.hdfs.layout=org.apache.kylin.engine.spark.common.logging.SensitivePatternLayout
+log4j.appender.hdfs.layout=org.apache.kylin.common.logging.SensitivePatternLayout
 #Don't add line number (%L) as it's too costly!
 log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e2727fa..01dc374 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2863,8 +2863,10 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("kylin.query.intersect.separator", "|");
     }
 
-    @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
-    public String sparderFiles() {
+    /**
+     * Used to upload user-defined log4j configuration
+     */
+    public String sparkUploadFiles() {
         try {
             File storageFile = FileUtils.findFile(KylinConfigBase.getKylinHome() + "/conf",
                     "spark-executor-log4j.properties");
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 8fba3bc..858278f 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -259,7 +259,7 @@ kylin.engine.spark-conf.spark.eventLog.enabled=true
 kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
-kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=job -Dkylin.spark.project=${job.project} -Dkylin.spark.identifier=${job.id} -Dkylin.spark.jobName=${job.stepId} -Duser.timezone=${user.timezone}
+kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.spark.category=job
 #kylin.engine.spark-conf.spark.sql.shuffle.partitions=1
 
 # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
@@ -269,7 +269,6 @@ kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -D
 # uncomment for HDP
 #kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
-#kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
 
 ### SPARK QUERY ENGINE CONFIGS (a.k.a. Sparder Context) ###
 # Enlarge cores and memory to improve query performance in production env, please check https://cwiki.apache.org/confluence/display/KYLIN/User+Manual+4.X
@@ -287,11 +286,10 @@ kylin.query.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializ
 #kylin.query.spark-conf.spark.sql.shuffle.partitions=40
 #kylin.query.spark-conf.spark.yarn.jars=hdfs://localhost:9000/spark2_jars/*
 
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project} -XX:MaxDirectMemorySize=896M
+kylin.query.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project}
 # uncomment for HDP
 #kylin.query.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.query.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
-#kylin.query.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
 
 ### QUERY PUSH DOWN ###
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 590276d..42a9c99 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.job.execution;
 
@@ -255,10 +255,6 @@ public class ExecutableManager {
         return result;
     }
 
-    public Output getOutputFromHDFSByJobId(String jobId) {
-        return getOutputFromHDFSByJobId(jobId, jobId);
-    }
-
     public Output getOutputFromHDFSByJobId(String jobId, String stepId) {
         return getOutputFromHDFSByJobId(jobId, stepId, 100);
     }
@@ -593,7 +589,7 @@ public class ExecutableManager {
                 logger.warn("The job " + jobId + " has been discarded.");
             }
             throw new IllegalStateException(
-                "The job " + job.getId() + " has already been finished and cannot be discarded.");
+                    "The job " + job.getId() + " has already been finished and cannot be discarded.");
         }
         if (job instanceof DefaultChainedExecutable) {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
@@ -617,7 +613,7 @@ public class ExecutableManager {
             for (AbstractExecutable task : tasks) {
                 if (task.getId().compareTo(stepId) >= 0) {
                     logger.debug("rollback task : " + task);
-                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", task.getLogPath());
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), "", task.getLogPath());
                 }
             }
         }
@@ -634,11 +630,11 @@ public class ExecutableManager {
         }
 
         if (!(job.getStatus() == ExecutableState.READY
-            || job.getStatus() == ExecutableState.RUNNING)) {
+                || job.getStatus() == ExecutableState.RUNNING)) {
             logger.warn("The status of job " + jobId + " is " + job.getStatus().toString()
-                + ". It's final state and cannot be transfer to be stopped!!!");
+                    + ". It's final state and cannot be transfer to be stopped!!!");
             throw new IllegalStateException(
-                "The job " + job.getId() + " has already been finished and cannot be stopped.");
+                    "The job " + job.getId() + " has already been finished and cannot be stopped.");
         }
         if (job instanceof DefaultChainedExecutable) {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
@@ -662,7 +658,6 @@ public class ExecutableManager {
     }
 
     public void updateJobOutput(String project, String jobId, ExecutableState newStatus, Map<String, String> info, String output, String logPath) {
-        // when 
         if (Thread.currentThread().isInterrupted()) {
             throw new RuntimeException("Current thread is interruptted, aborting");
         }
@@ -701,7 +696,6 @@ public class ExecutableManager {
         }
 
         if (project != null) {
-            //write output to HDFS
             updateJobOutputToHDFS(project, jobId, output, logPath);
         }
     }
@@ -715,7 +709,7 @@ public class ExecutableManager {
             jobOutput.setLogPath(logPath);
         }
         String outputHDFSPath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(project, jobId);
-
+        logger.debug("Update JobOutput To HDFS for {} to {} [{}]", jobId, outputHDFSPath, jobOutput.getContent() != null ? jobOutput.getContent().length() : -1);
         updateJobOutputToHDFS(outputHDFSPath, jobOutput);
     }
 
@@ -786,7 +780,7 @@ public class ExecutableManager {
                 if (executableDao.getJobOutput(task.getId()).getStatus().equals("SUCCEED")) {
                     continue;
                 } else if (executableDao.getJobOutput(task.getId()).getStatus().equals("RUNNING")) {
-                    updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", null);
+                    updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), "", null);
                 }
                 break;
             }
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
index 4a76022..381fd2d 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.kylin.engine.spark.common.logging;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -111,7 +129,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
 
     @Override
     String getAppenderName() {
-        return "SparkExecutorHdfsLogAppender";
+        return "SparkExecutorHdfsAppender";
     }
 
     @Override
@@ -164,6 +182,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
             String rollingDir = dateFormat.format(new Date(event.getTimeStamp()));
             outPutPath = getOutPutDir(rollingDir);
         }
+        LogLog.warn("Update to " + outPutPath);
     }
 
     private String getOutPutDir(String rollingDir) {
@@ -236,8 +255,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
     }
 
     private String parseHdfsWordingDir() {
-        return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/")
-                + StringUtils.replace(getMetadataIdentifier(), "/", "-");
+        return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/");
     }
 }
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 84737cb..4ec461a 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -135,8 +135,13 @@ public abstract class SparkApplication {
                 }
                 if (!config.getSparkConfigOverride().isEmpty()) {
                     for (Map.Entry<String, String> entry : config.getSparkConfigOverride().entrySet()) {
-                        logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue());
-                        sparkConf.set(entry.getKey(), entry.getValue());
+                        if (entry.getKey().contains("spark.executor.extraJavaOptions")) {
+                            // Just let NSparkExecutable#replaceSparkNodeJavaOpsConfIfNeeded(in JobServer) to determine executor's JVM level configuration
+                            logger.info("Do not override {}={}.", entry.getKey(), entry.getValue());
+                        } else {
+                            logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue());
+                            sparkConf.set(entry.getKey(), entry.getValue());
+                        }
                     }
                 }
             } else if (!isJobOnCluster(sparkConf)) {
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 3cc1b60..987c215 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -122,8 +122,8 @@ public class NSparkExecutable extends AbstractExecutable {
         String hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
         logger.info("write hadoop conf is {} ", config.getBuildConf());
         if (!config.getBuildConf().isEmpty()) {
-               logger.info("write hadoop conf is {} ", config.getBuildConf());
-               hadoopConf = config.getBuildConf();
+            logger.info("write hadoop conf is {} ", config.getBuildConf());
+            hadoopConf = config.getBuildConf();
         }
         if (StringUtils.isEmpty(hadoopConf) && !config.isUTEnv() && !config.isZKLocal()) {
             throw new RuntimeException(
@@ -203,7 +203,7 @@ public class NSparkExecutable extends AbstractExecutable {
         String project = getParam(MetadataConstants.P_PROJECT_NAME);
         Preconditions.checkState(StringUtils.isNotBlank(project), "job " + getId() + " project info is empty");
 
-        HashMap<String, String> jobOverrides = new HashMap();
+        HashMap<String, String> jobOverrides = new HashMap<>();
         String parentId = getParentId();
         jobOverrides.put("job.id", StringUtils.defaultIfBlank(parentId, getId()));
         jobOverrides.put("job.project", project);
@@ -250,7 +250,7 @@ public class NSparkExecutable extends AbstractExecutable {
     }
 
     private ExecuteResult runSparkSubmit(KylinConfig config, String hadoopConf, String jars,
-            String kylinJobJar, String appArgs, String jobId) {
+                                         String kylinJobJar, String appArgs, String jobId) {
         PatternedLogger patternedLogger;
         if (config.isJobLogPrintEnabled()) {
             patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
@@ -302,16 +302,21 @@ public class NSparkExecutable extends AbstractExecutable {
             sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
         }
 
-        replaceSparkDriverJavaOpsConfIfNeeded(config, sparkConfigOverride);
+        replaceSparkNodeJavaOpsConfIfNeeded(true, config, sparkConfigOverride);
+        replaceSparkNodeJavaOpsConfIfNeeded(false, config, sparkConfigOverride);
         return sparkConfigOverride;
     }
 
-    private void replaceSparkDriverJavaOpsConfIfNeeded(KylinConfig config, Map<String, String> sparkConfigOverride) {
-        String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions";
+    /**
+     * Add property in spark.xxx.extraJavaOptions for AbstractHdfsLogAppender
+     * Please check following for detail.
+     * 1. conf/spark-driver-log4j.properties and conf/spark-executor-log4j.properties
+     * 2. AbstractHdfsLogAppender
+     */
+    private void replaceSparkNodeJavaOpsConfIfNeeded(boolean isDriver, KylinConfig config,
+                                                     Map<String, String> sparkConfigOverride) {
+        String sparkNodeExtraJavaOptionsKey = isDriver ? "spark.driver.extraJavaOptions" : "spark.executor.extraJavaOptions";
         StringBuilder sb = new StringBuilder();
-        if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) {
-            sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey));
-        }
 
         String serverIp = "127.0.0.1";
         try {
@@ -320,31 +325,37 @@ public class NSparkExecutable extends AbstractExecutable {
             logger.warn("use the InetAddress get local ip failed!", e);
         }
         String serverPort = config.getServerPort();
-
         String hdfsWorkingDir = config.getHdfsWorkingDirectory();
-
-        String sparkDriverHdfsLogPath = null;
-        if (config instanceof KylinConfigExt) {
-            Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
-            if (Objects.nonNull(extendedOverrides)) {
-                sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
+        String log4jConfiguration;
+
+        if (isDriver) {
+            String sparkDriverHdfsLogPath;
+            if (config instanceof KylinConfigExt) {
+                Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
+                if (Objects.nonNull(extendedOverrides)) {
+                    sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
+                    sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
+                }
             }
-        }
-
-        /*
-        if (config.isCloud()) {
-            String logLocalWorkingDirectory = config.getLogLocalWorkingDirectory();
-            if (StringUtils.isNotBlank(logLocalWorkingDirectory)) {
-                hdfsWorkingDir = logLocalWorkingDirectory;
-                sparkDriverHdfsLogPath = logLocalWorkingDirectory + sparkDriverHdfsLogPath;
+            log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile();
+            sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration));
+            sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp));
+            sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort));
+            sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
+            sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
+        } else {
+            if (config instanceof KylinConfigExt) {
+                Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
+                if (Objects.nonNull(extendedOverrides)) {
+                    sb.append(String.format(Locale.ROOT, " -Dkylin.spark.project=%s ", extendedOverrides.get("job.project")));
+                    sb.append(String.format(Locale.ROOT, " -Dkylin.spark.jobName=%s ", extendedOverrides.get("job.stepId")));
+                    sb.append(String.format(Locale.ROOT, " -Dkylin.spark.identifier=%s ", extendedOverrides.get("job.id")));
+                }
             }
+            sb.append(String.format(Locale.ROOT, " -Dkylin.metadata.identifier=%s ", config.getMetadataUrlPrefix()));
         }
-         */
 
-        String log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile();
-        sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration));
         sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", config.isKerberosEnabled()));
-
         if (config.isKerberosEnabled()) {
             sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.principal=%s ", config.getKerberosPrincipal()));
             sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.keytab=%s", config.getKerberosKeytabPath()));
@@ -354,16 +365,17 @@ public class NSparkExecutable extends AbstractExecutable {
             }
         }
         sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDir));
-        sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
-        sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp));
-        sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort));
-        sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
-        sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
-        sparkConfigOverride.put(sparkDriverExtraJavaOptionsKey, sb.toString());
+
+        if (sparkConfigOverride.containsKey(sparkNodeExtraJavaOptionsKey)) {
+            sb.append(" ").append(sparkConfigOverride.get(sparkNodeExtraJavaOptionsKey));
+        }
+        logger.debug("Final property is set to <<{}>> for {} .", sb.toString(), sparkNodeExtraJavaOptionsKey);
+        // Here replace original Java options for Spark node
+        sparkConfigOverride.put(sparkNodeExtraJavaOptionsKey, sb.toString());
     }
 
     protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar,
-            String appArgs) {
+                                      String appArgs) {
         StringBuilder sb = new StringBuilder();
         sb.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.engine.spark.application.SparkEntry ");
 
@@ -377,11 +389,12 @@ public class NSparkExecutable extends AbstractExecutable {
         if (sparkConfs.containsKey("spark.sql.hive.metastore.jars")) {
             jars = jars + "," + sparkConfs.get("spark.sql.hive.metastore.jars");
         }
-
+        sb.append("--files ").append(config.sparkUploadFiles()).append(" ");
         sb.append("--name job_step_%s ");
         sb.append("--jars %s %s %s");
         String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, KylinConfig.getSparkHome(), getId(), jars, kylinJobJar,
                 appArgs);
+        // SparkConf still have a change to be changed in CubeBuildJob.java (Spark Driver)
         logger.info("spark submit cmd: {}", cmd);
         return cmd;
     }
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
index fb09d38..7ae0937 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
@@ -174,10 +174,10 @@ object KylinSession extends Logging {
         if (sparkConf.get("spark.master").startsWith("yarn")) {
           sparkConf.set("spark.yarn.dist.jars",
             KylinConfig.getInstanceFromEnv.getKylinParquetJobJarPath)
-          sparkConf.set("spark.yarn.dist.files", conf.sparderFiles())
+          sparkConf.set("spark.yarn.dist.files", conf.sparkUploadFiles())
         } else {
           sparkConf.set("spark.jars", conf.sparderJars)
-          sparkConf.set("spark.files", conf.sparderFiles())
+          sparkConf.set("spark.files", conf.sparkUploadFiles())
         }
 
         val fileName = KylinConfig.getInstanceFromEnv.getKylinParquetJobJarPath
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index c28362c..cd63ac7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -20,12 +20,14 @@ package org.apache.kylin.rest.controller;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Locale;
 
+import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
@@ -164,29 +166,20 @@ public class JobController extends BasicController {
 
     /**
      * Get a job step output
-     * 
-     * @return
-     * @throws IOException
      */
-
     @RequestMapping(value = "/{jobId}/steps/{stepId}/output", method = { RequestMethod.GET }, produces = {
             "application/json" })
     @ResponseBody
     public Map<String, String> getStepOutput(@PathVariable String jobId, @PathVariable String stepId) {
-        Map<String, String> result = new HashMap<String, String>();
+        Map<String, String> result = new HashMap<>();
         result.put("jobId", jobId);
         result.put("stepId", String.valueOf(stepId));
-        result.put("cmd_output", jobService.getJobOutput(jobId, stepId));
+        result.put("cmd_output", jobService.getJobStepOutput(jobId, stepId));
         return result;
     }
 
     /**
-     * Download a job step output from hdfs
-     * @param jobId
-     * @param stepId
-     * @param project
-     * @param response
-     * @return
+     * Download a step output(Spark driver log) from hdfs
      */
     @RequestMapping(value = "/{job_id:.+}/steps/{step_id:.+}/log", method = { RequestMethod.GET }, produces = { "application/json" })
     @ResponseBody
@@ -196,10 +189,12 @@ public class JobController extends BasicController {
         checkRequiredArg("job_id", jobId);
         checkRequiredArg("step_id", stepId);
         checkRequiredArg("project", project);
-        String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", project, stepId);
+        String validatedPrj =  CliCommandExecutor.checkParameter(project);
+        String validatedStepId =  CliCommandExecutor.checkParameter(stepId);
+        String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", validatedPrj, validatedStepId);
 
-        String jobOutput = jobService.getAllJobOutput(jobId, stepId);
-        setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes("UTF-8")), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response);
+        String jobOutput = jobService.getAllJobStepOutput(jobId, stepId);
+        setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes(StandardCharsets.UTF_8)), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response);
         return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", "");
     }
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 2c0c20c..90ee782 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -480,12 +480,12 @@ public class JobService extends BasicService implements InitializingBean {
         return getExecutableManager().getOutput(id);
     }
 
-    public String getJobOutput(String jobId, String stepId) {
+    public String getJobStepOutput(String jobId, String stepId) {
         ExecutableManager executableManager = getExecutableManager();
         return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg();
     }
 
-    public String getAllJobOutput(String jobId, String stepId) {
+    public String getAllJobStepOutput(String jobId, String stepId) {
         ExecutableManager executableManager = getExecutableManager();
         return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg();
     }


[kylin] 01/06: KYLIN-4813 Refactor spark build log

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 2dca9ef9a52cf1adb12684e0ba576b91e96b3792
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Thu Nov 12 17:50:47 2020 +0800

    KYLIN-4813 Refactor spark build log
---
 .../engine/mr/common/MapReduceExecutable.java      |   7 +-
 build/bin/kylin-port-replace-util.sh               |   1 +
 build/conf/kylin-parquet-log4j.properties          |  33 --
 build/conf/kylin-spark-log4j.properties            |  43 ---
 build/conf/spark-driver-log4j.properties           |  45 +++
 .../org/apache/kylin/common/KylinConfigBase.java   |  26 +-
 .../common/logging/SensitivePatternLayout.java     |  62 ++++
 .../src/main/resources/kylin-defaults.properties   |   3 +
 .../apache/kylin/job/dao/ExecutableOutputPO.java   |  11 +
 .../kylin/job/execution/AbstractExecutable.java    |  19 +-
 .../job/execution/DefaultChainedExecutable.java    |  18 +-
 .../kylin/job/execution/ExecutableManager.java     | 263 +++++++++++++-
 .../apache/kylin/job/ExecutableManagerTest.java    |  20 +-
 .../job/impl/threadpool/DefaultSchedulerTest.java  |   4 +-
 .../common/logging/AbstractHdfsLogAppender.java    | 377 +++++++++++++++++++++
 .../common/logging/SparkDriverHdfsLogAppender.java | 112 ++++++
 .../kylin/engine/spark/job/NSparkExecutable.java   |  81 +++--
 .../kylin/rest/controller/BasicController.java     |  16 +
 .../kylin/rest/controller/JobController.java       |  34 +-
 .../org/apache/kylin/rest/service/JobService.java  |  11 +
 20 files changed, 1044 insertions(+), 142 deletions(-)

diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index a33f171..4978fa0 100755
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -57,6 +57,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,7 +86,7 @@ public class MapReduceExecutable extends AbstractExecutable {
         if (output.getExtra().containsKey(START_TIME)) {
             final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
             if (mrJobId == null) {
-                getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath());
                 return;
             }
             try {
@@ -96,7 +97,7 @@ public class MapReduceExecutable extends AbstractExecutable {
                     //remove previous mr job info
                     super.onExecuteStart(executableContext);
                 } else {
-                    getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                    getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath());
                 }
             } catch (IOException | ParseException e) {
                 logger.warn("error get hadoop status");
@@ -180,7 +181,7 @@ public class MapReduceExecutable extends AbstractExecutable {
 
                 JobStepStatusEnum newStatus = HadoopJobStatusChecker.checkStatus(job, output);
                 if (status == JobStepStatusEnum.KILLED) {
-                    mgr.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin");
+                    mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin", null);
                     if (isDiscarded()) {
                         if (getIsNeedLock()) {
                             releaseLock(lock);
diff --git a/build/bin/kylin-port-replace-util.sh b/build/bin/kylin-port-replace-util.sh
index ea96791..12a2abd 100755
--- a/build/bin/kylin-port-replace-util.sh
+++ b/build/bin/kylin-port-replace-util.sh
@@ -96,6 +96,7 @@ then
     sed -i "s/^kylin\.stream\.node=.*$/$stream_node/g" ${KYLIN_CONFIG_FILE}
 
     sed -i "s/#*kylin.server.cluster-servers=\(.*\).*:\(.*\)/kylin.server.cluster-servers=\1:${new_kylin_port}/g" ${KYLIN_CONFIG_FILE}
+    sed -i "s/#*server.port=.*$/server.port=${new_kylin_port}/g" ${KYLIN_CONFIG_FILE}
 
     echo "New kylin port is : ${new_kylin_port}"
 
diff --git a/build/conf/kylin-parquet-log4j.properties b/build/conf/kylin-parquet-log4j.properties
deleted file mode 100644
index 36b7dd4..0000000
--- a/build/conf/kylin-parquet-log4j.properties
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#  
-#     http://www.apache.org/licenses/LICENSE-2.0
-#  
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-
-#overall config
-log4j.rootLogger=WARN,stdout
-log4j.logger.org.apache.kylin=DEBUG
-log4j.logger.org.springframework=WARN
-log4j.logger.org.springframework.security=WARN
-log4j.logger.org.apache.spark=WARN
-# For the purpose of getting Tracking URL
-log4j.logger.org.apache.spark.deploy.yarn=INFO
-log4j.logger.org.apache.spark.ContextCleaner=WARN
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.err
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
\ No newline at end of file
diff --git a/build/conf/kylin-spark-log4j.properties b/build/conf/kylin-spark-log4j.properties
deleted file mode 100644
index 948fb32..0000000
--- a/build/conf/kylin-spark-log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-log4j.rootCategory=WARN,stderr,stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
-
-log4j.appender.stderr=org.apache.log4j.ConsoleAppender
-log4j.appender.stderr.Target=System.err
-log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
-log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
-
-
-# Settings to quiet third party logs that are too verbose
-log4j.logger.org.spark-project.jetty=WARN
-log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
-log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
-log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
-log4j.logger.org.apache.parquet=ERROR
-log4j.logger.parquet=ERROR
-
-# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
-log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
-log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
-log4j.logger.org.apache.spark.sql=WARN
-
-log4j.logger.org.apache.kylin=DEBUG
\ No newline at end of file
diff --git a/build/conf/spark-driver-log4j.properties b/build/conf/spark-driver-log4j.properties
new file mode 100644
index 0000000..8b0e82d
--- /dev/null
+++ b/build/conf/spark-driver-log4j.properties
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#overall config
+log4j.rootLogger=INFO,hdfs
+log4j.logger.org.apache.kylin=DEBUG
+log4j.logger.org.springframework=WARN
+log4j.logger.org.springframework.security=WARN
+log4j.logger.org.apache.spark=WARN
+log4j.logger.org.apache.spark.ContextCleaner=WARN
+
+# hdfs file appender
+log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkDriverHdfsLogAppender
+log4j.appender.hdfs.kerberosEnable=${kylin.kerberos.enabled}
+log4j.appender.hdfs.kerberosPrincipal=${kylin.kerberos.principal}
+log4j.appender.hdfs.kerberosKeytab=${kylin.kerberos.keytab}
+log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir}
+log4j.appender.hdfs.logPath=${spark.driver.log4j.appender.hdfs.File}
+log4j.appender.hdfs.logQueueCapacity=5000
+#flushPeriod count as millis
+log4j.appender.hdfs.flushInterval=5000
+log4j.appender.hdfs.layout=org.apache.kylin.common.logging.SensitivePatternLayout
+#Don't add line number (%L) as it's too costly!
+log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
+
+log4j.appender.logFile=org.apache.log4j.FileAppender
+log4j.appender.logFile.Threshold=DEBUG
+log4j.appender.logFile.File=${spark.driver.local.logDir}/${spark.driver.param.taskId}.log
+log4j.appender.logFile.layout=org.apache.kylin.common.logging.SensitivePatternLayout
+log4j.appender.logFile.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 8e53a9b..081d23f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -258,6 +258,8 @@ public abstract class KylinConfigBase implements Serializable {
 
     final protected void reloadKylinConfig(Properties properties) {
         this.properties = BCC.check(properties);
+        setProperty("kylin.metadata.url.identifier", getMetadataUrlPrefix());
+        setProperty("kylin.log.spark-driver-properties-file", getLogSparkDriverPropertiesFile());
     }
 
     private Map<Integer, String> convertKeyToInteger(Map<String, String> map) {
@@ -465,6 +467,10 @@ public abstract class KylinConfigBase implements Serializable {
         return getMetadataUrl().getIdentifier();
     }
 
+    public String getServerPort() {
+        return getOptional("server.port", "7070");
+    }
+
     public Map<String, String> getResourceStoreImpls() {
         Map<String, String> r = Maps.newLinkedHashMap();
         // ref constants in ISourceAware
@@ -866,6 +872,14 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("kylin.job.log-dir", "/tmp/kylin/logs");
     }
 
+    public String getKylinLogDir() {
+        String kylinHome = getKylinHome();
+        if (kylinHome == null) {
+            kylinHome = System.getProperty("KYLIN_HOME");
+        }
+        return kylinHome + File.separator + "logs";
+    }
+
     public boolean getRunAsRemoteCommand() {
         return Boolean.parseBoolean(getOptional("kylin.job.use-remote-cli"));
     }
@@ -2664,6 +2678,10 @@ public abstract class KylinConfigBase implements Serializable {
         return getHdfsWorkingDirectory() + project + "/job_tmp/";
     }
 
+    public String getSparkLogDir(String project) {
+        return getHdfsWorkingDirectory() + project + "/spark_logs/driver/";
+    }
+
     @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public int getPersistFlatTableThreshold() {
         return Integer.parseInt(getOptional("kylin.engine.persist-flattable-threshold", "1"));
@@ -2807,8 +2825,8 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.query.spark-engine.enabled", "true"));
     }
 
-    public String getLogSparkPropertiesFile() {
-        return getLogPropertyFile("kylin-parquet-log4j.properties");
+    public String getLogSparkDriverPropertiesFile() {
+        return getLogPropertyFile("spark-driver-log4j.properties");
     }
 
     private String getLogPropertyFile(String filename) {
@@ -2875,6 +2893,10 @@ public abstract class KylinConfigBase implements Serializable {
                 .parseBoolean(this.getOptional("kylin.query.pushdown.auto-set-shuffle-partitions-enabled", "true"));
     }
 
+    public String getJobOutputStorePath(String project, String jobId) {
+        return getSparkLogDir(project) + getNestedPath(jobId) + "execute_output.json";
+    }
+
     public int getBaseShufflePartitionSize() {
         return Integer.parseInt(this.getOptional("kylin.query.pushdown.base-shuffle-partition-size", "48"));
     }
diff --git a/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java b/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java
new file mode 100644
index 0000000..013d42d
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.logging;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SensitivePatternLayout extends PatternLayout {
+    private static final String PREFIX_GROUP_NAME = "prefix";
+    private static final String SENSITIVE_GROUP_NAME = "sensitive";
+    private static final String MASK = "******";
+    private static final Pattern SENSITIVE_PATTERN = Pattern.compile(
+            String.format(Locale.ROOT, "(?<%s>password\\s*[:=])(?<%s>[^,.!]*)", PREFIX_GROUP_NAME, SENSITIVE_GROUP_NAME),
+            Pattern.CASE_INSENSITIVE);
+
+    @Override
+    public String format(LoggingEvent event) {
+        if (event.getMessage() instanceof String) {
+            String maskedMessage = mask(event.getRenderedMessage());
+
+            Throwable throwable = event.getThrowableInformation() != null
+                    ? event.getThrowableInformation().getThrowable()
+                    : null;
+            LoggingEvent maskedEvent = new LoggingEvent(event.fqnOfCategoryClass,
+                    Logger.getLogger(event.getLoggerName()), event.timeStamp, event.getLevel(), maskedMessage,
+                    throwable);
+
+            return super.format(maskedEvent);
+        }
+        return super.format(event);
+    }
+
+    private String mask(String message) {
+        Matcher matcher = SENSITIVE_PATTERN.matcher(message);
+        if (matcher.find()) {
+            return matcher.replaceAll(String.format(Locale.ROOT, "${%s}%s", PREFIX_GROUP_NAME, MASK));
+        }
+        return message;
+    }
+}
+
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index eb8398b..3d8268f 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -44,6 +44,9 @@ kylin.env.zookeeper-connect-string=sandbox.hortonworks.com
 # Kylin server mode, valid value [all, query, job]
 kylin.server.mode=all
 
+## Kylin server port
+server.port=7070
+
 # List of web servers in use, this enables one web server instance to sync up with other servers.
 kylin.server.cluster-servers=localhost:7070
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
index fdc6cc4..27a2831 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
@@ -38,6 +38,9 @@ public class ExecutableOutputPO extends RootPersistentEntity {
     @JsonProperty("status")
     private String status = "READY";
 
+    @JsonProperty("log_path")
+    private String logPath;
+
     @JsonProperty("info")
     private Map<String, String> info = Maps.newHashMap();
 
@@ -64,4 +67,12 @@ public class ExecutableOutputPO extends RootPersistentEntity {
     public void setInfo(Map<String, String> info) {
         this.info = info;
     }
+
+    public void setLogPath(String logPath) {
+        this.logPath = logPath;
+    }
+
+    public String getLogPath() {
+        return logPath;
+    }
 }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 202d22e..d815e5b 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -82,6 +82,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     private Map<String, String> params = Maps.newHashMap();
     protected Integer priority;
     private CubeBuildTypeEnum jobType;
+    private String logPath;
     protected String project;
     private String targetSubject;
     private List<String> targetSegments = Lists.newArrayList();//uuid of related segments
@@ -95,6 +96,14 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         this.config = config;
     }
 
+    public void setLogPath(String logPath) {
+        this.logPath = logPath;
+    }
+
+    public String getLogPath() {
+        return logPath;
+    }
+
     protected KylinConfig getConfig() {
         return config;
     }
@@ -107,7 +116,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         checkJobPaused();
         Map<String, String> info = Maps.newHashMap();
         info.put(START_TIME, Long.toString(System.currentTimeMillis()));
-        getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
+        getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, info, null, getLogPath());
     }
 
     public KylinConfig getCubeSpecificConfig() {
@@ -151,9 +160,9 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         setEndTime(System.currentTimeMillis());
         if (!isDiscarded() && !isRunnable()) {
             if (result.succeed()) {
-                getManager().updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
+                getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.SUCCEED, null, result.output(), getLogPath());
             } else {
-                getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
+                getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, result.output(), getLogPath());
             }
         }
     }
@@ -168,7 +177,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
                 exception.printStackTrace(new PrintWriter(out));
                 output = out.toString();
             }
-            getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, output);
+            getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, output, getLogPath());
         }
     }
 
@@ -587,7 +596,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
 
     public final String getProject() {
         if (project == null) {
-            throw new IllegalStateException("project is not set for abstract executable " + getId());
+            logger.error("project is not set for abstract executable " + getId());
         }
         return project;
     }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index bbeaf6a..c0f6d87 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -101,11 +101,11 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
     protected void onExecuteStart(ExecutableContext executableContext) {
         final long startTime = getStartTime();
         if (startTime > 0) {
-            getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+            getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath());
         } else {
             Map<String, String> info = Maps.newHashMap();
             info.put(START_TIME, Long.toString(System.currentTimeMillis()));
-            getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
+            getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, info, null, getLogPath());
         }
         getManager().addJobInfo(getId(), BUILD_INSTANCE, DistributedLockFactory.processAndHost());
     }
@@ -137,8 +137,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
                             "There shouldn't be a running subtask[jobId: {}, jobName: {}], \n"
                                     + "it might cause endless state, will retry to fetch subtask's state.",
                             task.getId(), task.getName());
-                    getManager().updateJobOutput(task.getId(), ExecutableState.ERROR, null,
-                            "killed due to inconsistent state");
+                    getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.ERROR, null,
+                            "killed due to inconsistent state", getLogPath());
                     hasError = true;
                 }
 
@@ -156,21 +156,21 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
             }
             if (allSucceed) {
                 setEndTime(System.currentTimeMillis());
-                mgr.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null);
+                mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.SUCCEED, null, null, getLogPath());
                 onStatusChange(executableContext, result, ExecutableState.SUCCEED);
             } else if (hasError) {
                 setEndTime(System.currentTimeMillis());
-                mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
+                mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, null, getLogPath());
                 onStatusChange(executableContext, result, ExecutableState.ERROR);
             } else if (hasDiscarded) {
                 setEndTime(System.currentTimeMillis());
-                mgr.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null);
+                mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.DISCARDED, null, null, getLogPath());
             } else {
-                mgr.updateJobOutput(getId(), ExecutableState.READY, null, null);
+                mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.READY, null, null, getLogPath());
             }
         } else {
             setEndTime(System.currentTimeMillis());
-            mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
+            mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, result.output(), getLogPath());
             onStatusChange(executableContext, result, ExecutableState.ERROR);
         }
     }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index ecc2c2e..c9ae8a4 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -23,7 +23,12 @@ import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID;
 import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;
 import static org.apache.kylin.job.constant.ExecutableConstants.FLINK_JOB_ID;
 
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.IllegalFormatException;
@@ -31,18 +36,28 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Deque;
+import java.util.ArrayDeque;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.JobProcessContext;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.dao.ExecutableDao;
 import org.apache.kylin.job.dao.ExecutableOutputPO;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.exception.IllegalStateTranferException;
 import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -240,6 +255,197 @@ public class ExecutableManager {
         return result;
     }
 
+    public Output getOutputFromHDFSByJobId(String jobId) {
+        return getOutputFromHDFSByJobId(jobId, jobId);
+    }
+
+    public Output getOutputFromHDFSByJobId(String jobId, String stepId) {
+        return getOutputFromHDFSByJobId(jobId, stepId, 100);
+    }
+
+    /**
+     * get job output from hdfs json file;
+     * if json file contains logPath,
+     * the logPath is spark driver log hdfs path(*.json.log), read sample data from log file.
+     *
+     * @param jobId
+     * @return
+     */
+    public Output getOutputFromHDFSByJobId(String jobId, String stepId, int nLines) {
+        AbstractExecutable jobInstance = getJob(jobId);
+        String outputStorePath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(jobInstance.getParam(MetadataConstants.P_PROJECT_NAME), stepId);
+        ExecutableOutputPO jobOutput = getJobOutputFromHDFS(outputStorePath);
+        assertOutputNotNull(jobOutput, outputStorePath);
+
+        if (Objects.nonNull(jobOutput.getLogPath())) {
+            if (isHdfsPathExists(jobOutput.getLogPath())) {
+                jobOutput.setContent(getSampleDataFromHDFS(jobOutput.getLogPath(), nLines));
+            } else if (StringUtils.isEmpty(jobOutput.getContent()) && Objects.nonNull(getJob(jobId))
+                    && getJob(jobId).getStatus() == ExecutableState.RUNNING) {
+                jobOutput.setContent("Wait a moment ... ");
+            }
+        }
+
+        return parseOutput(jobOutput);
+    }
+
+    public ExecutableOutputPO getJobOutputFromHDFS(String resPath) {
+        DataInputStream din = null;
+        try {
+            Path path = new Path(resPath);
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            if (!fs.exists(path)) {
+                ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
+                executableOutputPO.setContent("job output not found, please check kylin.log");
+                return executableOutputPO;
+            }
+
+            din = fs.open(path);
+            return JsonUtil.readValue(din, ExecutableOutputPO.class);
+        } catch (Exception e) {
+            // If the output file on hdfs is corrupt, give an empty output
+            logger.error("get job output [{}] from HDFS failed.", resPath, e);
+            ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
+            executableOutputPO.setContent("job output broken, please check kylin.log");
+            return executableOutputPO;
+        } finally {
+            IOUtils.closeQuietly(din);
+        }
+    }
+
+    private void assertOutputNotNull(ExecutableOutputPO output, String idOrPath) {
+        com.google.common.base.Preconditions.checkArgument(output != null, "there is no related output for job :" + idOrPath);
+    }
+
+    /**
+     * check the hdfs path exists.
+     *
+     * @param hdfsPath
+     * @return
+     */
+    public boolean isHdfsPathExists(String hdfsPath) {
+        if (StringUtils.isBlank(hdfsPath)) {
+            return false;
+        }
+
+        Path path = new Path(hdfsPath);
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        try {
+            return fs.exists(path);
+        } catch (IOException e) {
+            logger.error("check the hdfs path [{}] exists failed, ", hdfsPath, e);
+        }
+
+        return false;
+    }
+
+    /**
+     * get sample data from hdfs log file.
+     * specified the lines, will get the first num lines and last num lines.
+     *
+     * @param resPath
+     * @return
+     */
+    public String getSampleDataFromHDFS(String resPath, final int nLines) {
+        try {
+            Path path = new Path(resPath);
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            if (!fs.exists(path)) {
+                return null;
+            }
+
+            FileStatus fileStatus = fs.getFileStatus(path);
+            try (FSDataInputStream din = fs.open(path);
+                 BufferedReader reader = new BufferedReader(new InputStreamReader(din, "UTF-8"))) {
+
+                String line;
+                StringBuilder sampleData = new StringBuilder();
+                for (int i = 0; i < nLines && (line = reader.readLine()) != null; i++) {
+                    if (sampleData.length() > 0) {
+                        sampleData.append('\n');
+                    }
+                    sampleData.append(line);
+                }
+
+                int offset = sampleData.toString().getBytes("UTF-8").length + 1;
+                if (offset < fileStatus.getLen()) {
+                    sampleData.append("\n================================================================\n");
+                    sampleData.append(tailHdfsFileInputStream(din, offset, fileStatus.getLen(), nLines));
+                }
+                return sampleData.toString();
+            }
+        } catch (IOException e) {
+            logger.error("get sample data from hdfs log file [{}] failed!", resPath, e);
+            return null;
+        }
+    }
+
+    /**
+     * get the last N_LINES lines from the end of hdfs file input stream;
+     * reference: https://olapio.atlassian.net/wiki/spaces/PD/pages/1306918958
+     *
+     * @param hdfsDin
+     * @param startPos
+     * @param endPos
+     * @param nLines
+     * @return
+     * @throws IOException
+     */
+    private String tailHdfsFileInputStream(FSDataInputStream hdfsDin, final long startPos, final long endPos,
+                                           final int nLines) throws IOException {
+        com.google.common.base.Preconditions.checkNotNull(hdfsDin);
+        com.google.common.base.Preconditions.checkArgument(startPos < endPos && startPos >= 0);
+        com.google.common.base.Preconditions.checkArgument(nLines >= 0);
+
+        Deque<String> deque = new ArrayDeque<>();
+        int buffSize = 8192;
+        byte[] byteBuf = new byte[buffSize];
+
+        long pos = endPos;
+
+        // cause by log last char is \n
+        hdfsDin.seek(pos - 1);
+        int lastChar = hdfsDin.read();
+        if ('\n' == lastChar) {
+            pos--;
+        }
+
+        int bytesRead = (int) ((pos - startPos) % buffSize);
+        if (bytesRead == 0) {
+            bytesRead = buffSize;
+        }
+
+        pos -= bytesRead;
+        int lines = nLines;
+        while (lines > 0 && pos >= startPos) {
+            bytesRead = hdfsDin.read(pos, byteBuf, 0, bytesRead);
+
+            int last = bytesRead;
+            for (int i = bytesRead - 1; i >= 0 && lines > 0; i--) {
+                if (byteBuf[i] == '\n') {
+                    deque.push(new String(byteBuf, i, last - i, StandardCharsets.UTF_8));
+                    lines--;
+                    last = i;
+                }
+            }
+
+            if (lines > 0 && last > 0) {
+                deque.push(new String(byteBuf, 0, last, StandardCharsets.UTF_8));
+            }
+
+            bytesRead = buffSize;
+            pos -= bytesRead;
+        }
+
+        StringBuilder sb = new StringBuilder();
+        while (!deque.isEmpty()) {
+            sb.append(deque.pop());
+        }
+
+        return sb.length() > 0 && sb.charAt(0) == '\n' ? sb.substring(1) : sb.toString();
+    }
+
+
     public List<AbstractExecutable> getAllExecutables() {
         try {
             List<AbstractExecutable> ret = Lists.newArrayList();
@@ -342,12 +548,12 @@ public class ExecutableManager {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
                 if (task.getStatus() == ExecutableState.RUNNING) {
-                    updateJobOutput(task.getId(), ExecutableState.READY, null, null);
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, null, null, task.getLogPath());
                     break;
                 }
             }
         }
-        updateJobOutput(jobId, ExecutableState.READY, null, null);
+        updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.READY, null, null, job.getLogPath());
     }
 
     public void resumeJob(String jobId) {
@@ -360,7 +566,7 @@ public class ExecutableManager {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
                 if (task.getStatus() == ExecutableState.ERROR || task.getStatus() == ExecutableState.STOPPED) {
-                    updateJobOutput(task.getId(), ExecutableState.READY, null, "no output");
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, null, "no output", task.getLogPath());
                     break;
                 }
             }
@@ -372,7 +578,7 @@ public class ExecutableManager {
                 info.remove(AbstractExecutable.END_TIME);
             }
         }
-        updateJobOutput(jobId, ExecutableState.READY, info, null);
+        updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.READY, info, null, job.getLogPath());
     }
 
     public void discardJob(String jobId) {
@@ -394,11 +600,11 @@ public class ExecutableManager {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
                 if (!task.getStatus().isFinalState()) {
-                    updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null);
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.DISCARDED, null, null, task.getLogPath());
                 }
             }
         }
-        updateJobOutput(jobId, ExecutableState.DISCARDED, null, null);
+        updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.DISCARDED, null, null, job.getLogPath());
     }
 
     public void rollbackJob(String jobId, String stepId) {
@@ -412,13 +618,13 @@ public class ExecutableManager {
             for (AbstractExecutable task : tasks) {
                 if (task.getId().compareTo(stepId) >= 0) {
                     logger.debug("rollback task : " + task);
-                    updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "");
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", task.getLogPath());
                 }
             }
         }
 
         if (job.getStatus() == ExecutableState.SUCCEED) {
-            updateJobOutput(job.getId(), ExecutableState.READY, null, null);
+            updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), job.getId(), ExecutableState.READY, null, null, job.getLogPath());
         }
     }
 
@@ -439,12 +645,12 @@ public class ExecutableManager {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
                 if (!task.getStatus().isFinalState()) {
-                    updateJobOutput(task.getId(), ExecutableState.STOPPED, null, null);
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.STOPPED, null, null, task.getLogPath());
                     break;
                 }
             }
         }
-        updateJobOutput(jobId, ExecutableState.STOPPED, null, null);
+        updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.STOPPED, null, null, job.getLogPath());
     }
 
     public ExecutableOutputPO getJobOutput(String jobId) {
@@ -456,7 +662,7 @@ public class ExecutableManager {
         }
     }
 
-    public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) {
+    public void updateJobOutput(String project, String jobId, ExecutableState newStatus, Map<String, String> info, String output, String logPath) {
         // when 
         if (Thread.currentThread().isInterrupted()) {
             throw new RuntimeException("Current thread is interruptted, aborting");
@@ -494,6 +700,39 @@ public class ExecutableManager {
             logger.error("error change job:" + jobId + " to " + newStatus);
             throw new RuntimeException(e);
         }
+
+        if (project != null) {
+            //write output to HDFS
+            updateJobOutputToHDFS(project, jobId, output, logPath);
+        }
+    }
+
+    public void updateJobOutputToHDFS(String project, String jobId, String output, String logPath) {
+        ExecutableOutputPO jobOutput = getJobOutput(jobId);
+        if (null != output) {
+            jobOutput.setContent(output);
+        }
+        if (null != logPath) {
+            jobOutput.setLogPath(logPath);
+        }
+        String outputHDFSPath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(project, jobId);
+
+        updateJobOutputToHDFS(outputHDFSPath, jobOutput);
+    }
+
+    public void updateJobOutputToHDFS(String resPath, ExecutableOutputPO obj) {
+        DataOutputStream dout = null;
+        try {
+            Path path = new Path(resPath);
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            dout = fs.create(path, true);
+            JsonUtil.writeValue(dout, obj);
+        } catch (Exception e) {
+            // the operation to update output to hdfs failed, next task should not be interrupted.
+            logger.error("update job output [{}] to HDFS failed.", resPath, e);
+        } finally {
+            IOUtils.closeQuietly(dout);
+        }
     }
 
     private boolean needDestroyProcess(ExecutableState from, ExecutableState to) {
@@ -548,7 +787,7 @@ public class ExecutableManager {
                 if (executableDao.getJobOutput(task.getId()).getStatus().equals("SUCCEED")) {
                     continue;
                 } else if (executableDao.getJobOutput(task.getId()).getStatus().equals("RUNNING")) {
-                    updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "");
+                    updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", null);
                 }
                 break;
             }
diff --git a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
index 3535d37..3341dff 100644
--- a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
@@ -75,7 +75,7 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase {
         AbstractExecutable another = service.getJob(executable.getId());
         assertJobEqual(executable, another);
 
-        service.updateJobOutput(executable.getId(), ExecutableState.RUNNING, null, "test output");
+        service.updateJobOutput(null, executable.getId(), ExecutableState.RUNNING, null, "test output", null);
         assertJobEqual(executable, service.getJob(executable.getId()));
     }
 
@@ -98,21 +98,21 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase {
         SucceedTestExecutable job = new SucceedTestExecutable();
         String id = job.getId();
         service.addJob(job);
-        service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
-        service.updateJobOutput(id, ExecutableState.ERROR, null, null);
-        service.updateJobOutput(id, ExecutableState.READY, null, null);
-        service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
-        service.updateJobOutput(id, ExecutableState.READY, null, null);
-        service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
-        service.updateJobOutput(id, ExecutableState.SUCCEED, null, null);
+        service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.ERROR, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.READY, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.READY, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.SUCCEED, null, null, null);
     }
 
     @Test(expected = IllegalStateTranferException.class)
     public void testInvalidStateTransfer() {
         SucceedTestExecutable job = new SucceedTestExecutable();
         service.addJob(job);
-        service.updateJobOutput(job.getId(), ExecutableState.ERROR, null, null);
-        service.updateJobOutput(job.getId(), ExecutableState.STOPPED, null, null);
+        service.updateJobOutput(null, job.getId(), ExecutableState.ERROR, null, null, null);
+        service.updateJobOutput(null, job.getId(), ExecutableState.STOPPED, null, null, null);
     }
 
     private static void assertJobEqual(Executable one, Executable another) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index 0855012..71fc19f 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -145,8 +145,8 @@ public class DefaultSchedulerTest extends BaseSchedulerTest {
         job.addTask(task1);
         job.addTask(task2);
         execMgr.addJob(job);
-        ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).updateJobOutput(task2.getId(),
-                ExecutableState.RUNNING, null, null);
+        ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).updateJobOutput(null, task2.getId(),
+                ExecutableState.RUNNING, null, null, null);
         waitForJobFinish(job.getId(), MAX_WAIT_TIME);
         Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState());
         Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
new file mode 100644
index 0000000..5a90fb2
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.common.logging;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractHdfsLogAppender extends AppenderSkeleton {
+    private final Object flushLogLock = new Object();
+    private final Object initWriterLock = new Object();
+    private final Object closeLock = new Object();
+    private final Object fileSystemLock = new Object();
+
+    private FSDataOutputStream outStream = null;
+    private BufferedWriter bufferedWriter = null;
+
+    private FileSystem fileSystem = null;
+
+    private ExecutorService appendHdfsService = null;
+
+    private BlockingDeque<LoggingEvent> logBufferQue = null;
+    private static final double QUEUE_FLUSH_THRESHOLD = 0.2;
+
+    //configurable
+    private int logQueueCapacity = 8192;
+    private int flushInterval = 5000;
+    private String hdfsWorkingDir;
+
+    public int getLogQueueCapacity() {
+        return logQueueCapacity;
+    }
+
+    public void setLogQueueCapacity(int logQueueCapacity) {
+        this.logQueueCapacity = logQueueCapacity;
+    }
+
+    public BlockingDeque<LoggingEvent> getLogBufferQue() {
+        return logBufferQue;
+    }
+
+    public int getFlushInterval() {
+        return flushInterval;
+    }
+
+    public void setFlushInterval(int flushInterval) {
+        this.flushInterval = flushInterval;
+    }
+
+    public String getHdfsWorkingDir() {
+        return hdfsWorkingDir;
+    }
+
+    public void setHdfsWorkingDir(String hdfsWorkingDir) {
+        this.hdfsWorkingDir = hdfsWorkingDir;
+    }
+
+    public FileSystem getFileSystem() {
+        if (null == fileSystem) {
+            return getFileSystem(new Configuration());
+        }
+        return fileSystem;
+    }
+
+    private FileSystem getFileSystem(Configuration conf) {
+        synchronized (fileSystemLock) {
+            if (null == fileSystem) {
+                try {
+                    fileSystem = new Path(hdfsWorkingDir).getFileSystem(conf);
+                } catch (IOException e) {
+                    LogLog.error("Failed to create the file system, ", e);
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return fileSystem;
+    }
+
+    public boolean isWriterInited() {
+        synchronized (initWriterLock) {
+            return null != bufferedWriter;
+        }
+    }
+
+    abstract void init();
+
+    abstract String getAppenderName();
+
+    /**
+     * init the load resource.
+     */
+    @Override
+    public void activateOptions() {
+        LogLog.warn(String.format(Locale.ROOT, "%s starting ...", getAppenderName()));
+        LogLog.warn("hdfsWorkingDir -> " + getHdfsWorkingDir());
+
+        init();
+
+        logBufferQue = new LinkedBlockingDeque<>(getLogQueueCapacity());
+        appendHdfsService = Executors.newSingleThreadExecutor();
+        appendHdfsService.execute(this::checkAndFlushLog);
+        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+
+        LogLog.warn(String.format(Locale.ROOT, "%s started ...", getAppenderName()));
+    }
+
+    @Override
+    public void append(LoggingEvent loggingEvent) {
+        try {
+            boolean offered = logBufferQue.offer(loggingEvent, 10, TimeUnit.SECONDS);
+            if (!offered) {
+                LogLog.error("LogEvent cannot put into the logBufferQue, log event content:");
+                printLoggingEvent(loggingEvent);
+            }
+        } catch (InterruptedException e) {
+            LogLog.warn("Append logging event interrupted!", e);
+            // Restore interrupted state...
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void close() {
+        synchronized (closeLock) {
+            if (!this.closed) {
+                this.closed = true;
+
+                List<LoggingEvent> transaction = Lists.newArrayList();
+                try {
+                    flushLog(getLogBufferQue().size(), transaction);
+
+                    closeWriter();
+                    if (appendHdfsService != null && !appendHdfsService.isShutdown()) {
+                        appendHdfsService.shutdownNow();
+                    }
+                } catch (Exception e) {
+                    transaction.forEach(this::printLoggingEvent);
+                    try {
+                        while (!getLogBufferQue().isEmpty()) {
+                            printLoggingEvent(getLogBufferQue().take());
+                        }
+                    } catch (Exception ie) {
+                        LogLog.error("clear the logging buffer queue failed!", ie);
+                    }
+                    LogLog.error(String.format(Locale.ROOT, "close %s failed!", getAppenderName()), e);
+                }
+                LogLog.warn(String.format(Locale.ROOT, "%s closed ...", getAppenderName()));
+            }
+        }
+    }
+
+    private void closeWriter() {
+        IOUtils.closeQuietly(bufferedWriter);
+        IOUtils.closeQuietly(outStream);
+    }
+
+    @Override
+    public boolean requiresLayout() {
+        return true;
+    }
+
+    /**
+     * some times need to wait the component init ok.
+     *
+     * @return
+     */
+    abstract boolean isSkipCheckAndFlushLog();
+
+    /**
+     * clear the log buffer queue when it was full.
+     */
+    private void clearLogBufferQueueWhenBlocked() {
+        if (logBufferQue.size() >= getLogQueueCapacity()) {
+            int removeNum = getLogQueueCapacity() / 5;
+            while (removeNum > 0) {
+                try {
+                    LoggingEvent loggingEvent = logBufferQue.take();
+                    printLoggingEvent(loggingEvent);
+                } catch (Exception ex) {
+                    LogLog.error("Take event interrupted!", ex);
+                }
+                removeNum--;
+            }
+        }
+    }
+
+    /**
+     * print the logging event to stderr
+     * @param loggingEvent
+     */
+    private void printLoggingEvent(LoggingEvent loggingEvent) {
+        try {
+            String log = getLayout().format(loggingEvent);
+            LogLog.error(log.endsWith("\n") ? log.substring(0, log.length() - 1) : log);
+            if (null != loggingEvent.getThrowableStrRep()) {
+                for (String stack : loggingEvent.getThrowableStrRep()) {
+                    LogLog.error(stack);
+                }
+            }
+        } catch (Exception e) {
+            LogLog.error("print logging event failed!", e);
+        }
+    }
+
+    /**
+     * flush the log to hdfs when conditions are satisfied.
+     */
+    protected void checkAndFlushLog() {
+        long start = System.currentTimeMillis();
+        do {
+            List<LoggingEvent> transaction = Lists.newArrayList();
+            try {
+                if (isSkipCheckAndFlushLog()) {
+                    continue;
+                }
+
+                int eventSize = getLogBufferQue().size();
+                if (eventSize > getLogQueueCapacity() * QUEUE_FLUSH_THRESHOLD
+                        || System.currentTimeMillis() - start > getFlushInterval()) {
+                    // update start time before doing flushLog to avoid exception when flushLog
+                    start = System.currentTimeMillis();
+                    flushLog(eventSize, transaction);
+                } else {
+                    Thread.sleep(getFlushInterval() / 100);
+                }
+            } catch (Exception e) {
+                transaction.forEach(this::printLoggingEvent);
+                clearLogBufferQueueWhenBlocked();
+                LogLog.error("Error occurred when consume event", e);
+            }
+        } while (!closed);
+    }
+
+    /**
+     * init the hdfs writer and create the hdfs file with outPath.
+     * need kerberos authentic, so fileSystem init here.
+     *
+     * @param outPath
+     */
+    protected boolean initHdfsWriter(Path outPath, Configuration conf) {
+        synchronized (initWriterLock) {
+            closeWriter();
+            bufferedWriter = null;
+            outStream = null;
+
+            int retry = 10;
+            while (retry-- > 0) {
+                try {
+                    fileSystem = getFileSystem(conf);
+                    outStream = fileSystem.create(outPath, true);
+                    break;
+                } catch (Exception e) {
+                    LogLog.error("fail to create stream for path: " + outPath, e);
+                }
+
+                try {
+                    initWriterLock.wait(1000);//waiting for acl to turn to current user
+                } catch (InterruptedException e) {
+                    LogLog.warn("Init writer interrupted!", e);
+                    // Restore interrupted state...
+                    Thread.currentThread().interrupt();
+                }
+            }
+            if (null != outStream) {
+                bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream, StandardCharsets.UTF_8));
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * write the data into the buffer.
+     *
+     * @param message
+     * @throws IOException
+     */
+    protected void write(String message) throws IOException {
+        bufferedWriter.write(message);
+    }
+
+    /**
+     * write the error stack info into buffer
+     *
+     * @param loggingEvent
+     * @throws IOException
+     */
+    protected void writeLogEvent(LoggingEvent loggingEvent) throws IOException {
+        if (null != loggingEvent) {
+            write(getLayout().format(loggingEvent));
+
+            if (null != loggingEvent.getThrowableStrRep()) {
+                for (String message : loggingEvent.getThrowableStrRep()) {
+                    write(message);
+                    write("\n");
+                }
+            }
+        }
+    }
+
+    /**
+     * do write log to the buffer.
+     *
+     * @param eventSize
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    abstract void doWriteLog(int eventSize, List<LoggingEvent> transaction) throws IOException, InterruptedException;
+
+    /**
+     * flush the buffer data to HDFS.
+     *
+     * @throws IOException
+     */
+    private void flush() throws IOException {
+        bufferedWriter.flush();
+        outStream.hsync();
+    }
+
+    /**
+     * take the all events from queue and write into the HDFS immediately.
+     *
+     * @param eventSize
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected void flushLog(int eventSize, List<LoggingEvent> transaction) throws IOException, InterruptedException {
+        if (eventSize <= 0) {
+            return;
+        }
+
+        synchronized (flushLogLock) {
+            if (eventSize > getLogBufferQue().size()) {
+                eventSize = getLogBufferQue().size();
+            }
+
+            doWriteLog(eventSize, transaction);
+
+            flush();
+        }
+    }
+}
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java
new file mode 100644
index 0000000..0e5ad3c
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.common.logging;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SparkDriverHdfsLogAppender extends AbstractHdfsLogAppender {
+
+    private String logPath;
+
+    // kerberos
+    private boolean kerberosEnable = false;
+    private String kerberosPrincipal;
+    private String kerberosKeytab;
+
+    public String getLogPath() {
+        return logPath;
+    }
+
+    public void setLogPath(String logPath) {
+        this.logPath = logPath;
+    }
+
+    public boolean isKerberosEnable() {
+        return kerberosEnable;
+    }
+
+    public void setKerberosEnable(boolean kerberosEnable) {
+        this.kerberosEnable = kerberosEnable;
+    }
+
+    public String getKerberosPrincipal() {
+        return kerberosPrincipal;
+    }
+
+    public void setKerberosPrincipal(String kerberosPrincipal) {
+        this.kerberosPrincipal = kerberosPrincipal;
+    }
+
+    public String getKerberosKeytab() {
+        return kerberosKeytab;
+    }
+
+    public void setKerberosKeytab(String kerberosKeytab) {
+        this.kerberosKeytab = kerberosKeytab;
+    }
+
+    @Override
+    public void init() {
+        LogLog.warn("spark.driver.log4j.appender.hdfs.File -> " + getLogPath());
+        LogLog.warn("kerberosEnable -> " + isKerberosEnable());
+        if (isKerberosEnable()) {
+            LogLog.warn("kerberosPrincipal -> " + getKerberosPrincipal());
+            LogLog.warn("kerberosKeytab -> " + getKerberosKeytab());
+        }
+    }
+
+    @Override
+    String getAppenderName() {
+        return "SparkDriverHdfsLogAppender";
+    }
+
+    @Override
+    public boolean isSkipCheckAndFlushLog() {
+        return false;
+    }
+
+    @Override
+    public void doWriteLog(int eventSize, List<LoggingEvent> transaction)
+            throws IOException, InterruptedException {
+        if (!isWriterInited()) {
+            Configuration conf = new Configuration();
+            if (isKerberosEnable()) {
+                UserGroupInformation.setConfiguration(conf);
+                UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());
+            }
+            if (!initHdfsWriter(new Path(getLogPath()), conf)) {
+                LogLog.error("init the hdfs writer failed!");
+            }
+        }
+
+        while (eventSize > 0) {
+            LoggingEvent loggingEvent = getLogBufferQue().take();
+            transaction.add(loggingEvent);
+            writeLogEvent(loggingEvent);
+            eventSize--;
+        }
+    }
+}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 98f63a1..3cc1b60 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -26,15 +26,14 @@ import java.net.UnknownHostException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collections;
-
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Map.Entry;
-
 import java.util.Set;
+import java.util.Objects;
+import java.util.Map.Entry;
 
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -108,6 +107,7 @@ public class NSparkExecutable extends AbstractExecutable {
         CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
         CubeInstance cube = cubeMgr.getCube(this.getCubeName());
         KylinConfig config = cube.getConfig();
+        this.setLogPath(getSparkDriverLogHdfsPath(context.getConfig()));
         config = wrapConfig(config);
 
         String sparkHome = KylinConfig.getSparkHome();
@@ -189,15 +189,12 @@ public class NSparkExecutable extends AbstractExecutable {
 
     /**
      * generate the spark driver log hdfs path format, json path + timestamp + .log
-     *
-     * @param
-     * @return
      */
-    /*public String getSparkDriverLogHdfsPath(KylinConfig config) {
-        return String.format("%s.%s.log", config.getJobTmpOutputStorePath(getProject(), getId()),
+    public String getSparkDriverLogHdfsPath(KylinConfig config) {
+        return String.format(Locale.ROOT, "%s.%s.log", config.getJobOutputStorePath(getParam(MetadataConstants.P_PROJECT_NAME), getId()),
                 System.currentTimeMillis());
-    }*/
-    
+    }
+
     protected KylinConfig wrapConfig(ExecutableContext context) {
         return wrapConfig(context.getConfig());
     }
@@ -206,15 +203,18 @@ public class NSparkExecutable extends AbstractExecutable {
         String project = getParam(MetadataConstants.P_PROJECT_NAME);
         Preconditions.checkState(StringUtils.isNotBlank(project), "job " + getId() + " project info is empty");
 
+        HashMap<String, String> jobOverrides = new HashMap();
         String parentId = getParentId();
-        originalConfig.setProperty("job.id", StringUtils.defaultIfBlank(parentId, getId()));
-        originalConfig.setProperty("job.project", project);
+        jobOverrides.put("job.id", StringUtils.defaultIfBlank(parentId, getId()));
+        jobOverrides.put("job.project", project);
         if (StringUtils.isNotBlank(parentId)) {
-            originalConfig.setProperty("job.stepId", getId());
+            jobOverrides.put("job.stepId", getId());
         }
-        originalConfig.setProperty("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone());
+        jobOverrides.put("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone());
+        jobOverrides.put("spark.driver.log4j.appender.hdfs.File",
+                Objects.isNull(this.getLogPath()) ? "null" : this.getLogPath());
 
-        return originalConfig;
+        return KylinConfigExt.createInstance(originalConfig, jobOverrides);
     }
 
     private void killOrphanApplicationIfExists(KylinConfig config, String jobId) {
@@ -302,27 +302,64 @@ public class NSparkExecutable extends AbstractExecutable {
             sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
         }
 
+        replaceSparkDriverJavaOpsConfIfNeeded(config, sparkConfigOverride);
+        return sparkConfigOverride;
+    }
+
+    private void replaceSparkDriverJavaOpsConfIfNeeded(KylinConfig config, Map<String, String> sparkConfigOverride) {
+        String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions";
+        StringBuilder sb = new StringBuilder();
+        if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) {
+            sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey));
+        }
+
         String serverIp = "127.0.0.1";
         try {
             serverIp = InetAddress.getLocalHost().getHostAddress();
         } catch (UnknownHostException e) {
             logger.warn("use the InetAddress get local ip failed!", e);
         }
+        String serverPort = config.getServerPort();
 
-        String log4jConfiguration = "file:" + config.getLogSparkPropertiesFile();
+        String hdfsWorkingDir = config.getHdfsWorkingDirectory();
 
-        String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions";
-        StringBuilder sb = new StringBuilder();
-        if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) {
-            sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey));
+        String sparkDriverHdfsLogPath = null;
+        if (config instanceof KylinConfigExt) {
+            Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
+            if (Objects.nonNull(extendedOverrides)) {
+                sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
+            }
         }
 
+        /*
+        if (config.isCloud()) {
+            String logLocalWorkingDirectory = config.getLogLocalWorkingDirectory();
+            if (StringUtils.isNotBlank(logLocalWorkingDirectory)) {
+                hdfsWorkingDir = logLocalWorkingDirectory;
+                sparkDriverHdfsLogPath = logLocalWorkingDirectory + sparkDriverHdfsLogPath;
+            }
+        }
+         */
+
+        String log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile();
         sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration));
+        sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", config.isKerberosEnabled()));
+
+        if (config.isKerberosEnabled()) {
+            sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.principal=%s ", config.getKerberosPrincipal()));
+            sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.keytab=%s", config.getKerberosKeytabPath()));
+            if (config.getPlatformZKEnable()) {
+                sb.append(String.format(Locale.ROOT, " -Djava.security.auth.login.config=%s", config.getKerberosJaasConfPath()));
+                sb.append(String.format(Locale.ROOT, " -Djava.security.krb5.conf=%s", config.getKerberosKrb5ConfPath()));
+            }
+        }
+        sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDir));
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
         sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp));
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort));
         sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
-
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
         sparkConfigOverride.put(sparkDriverExtraJavaOptionsKey, sb.toString());
-        return sparkConfigOverride;
     }
 
     protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar,
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java
index 8607348..dba8132 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java
@@ -50,6 +50,8 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.ResponseStatus;
 
+import static org.apache.kylin.shaded.com.google.common.net.HttpHeaders.CONTENT_DISPOSITION;
+
 /**
  */
 public class BasicController {
@@ -132,6 +134,20 @@ public class BasicController {
         }
     }
 
+    protected void setDownloadResponse(InputStream inputStream, String fileName, String contentType,
+                                       final HttpServletResponse response) throws IOException {
+        try (OutputStream output = response.getOutputStream()) {
+            response.reset();
+            response.setContentType(contentType);
+            response.setHeader(CONTENT_DISPOSITION, "attachment; filename=\"" + fileName + "\"");
+            IOUtils.copyLarge(inputStream, output);
+            output.flush();
+        } catch (IOException e) {
+            logger.error("Failed download log File!");
+            throw e;
+        }
+    }
+
     public boolean isAdmin() {
         boolean isAdmin = false;
         Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 65c036c..c28362c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -18,11 +18,13 @@
 
 package org.apache.kylin.rest.controller;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Locale;
 
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.constant.JobStatusEnum;
@@ -30,16 +32,22 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.JobListRequest;
+import org.apache.kylin.rest.response.EnvelopeResponse;
+import org.apache.kylin.rest.response.ResponseCode;
 import org.apache.kylin.rest.service.JobService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.MediaType;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RequestParam;
+
+import javax.servlet.http.HttpServletResponse;
 
 @Controller
 @RequestMapping(value = "jobs")
@@ -160,6 +168,7 @@ public class JobController extends BasicController {
      * @return
      * @throws IOException
      */
+
     @RequestMapping(value = "/{jobId}/steps/{stepId}/output", method = { RequestMethod.GET }, produces = {
             "application/json" })
     @ResponseBody
@@ -167,11 +176,34 @@ public class JobController extends BasicController {
         Map<String, String> result = new HashMap<String, String>();
         result.put("jobId", jobId);
         result.put("stepId", String.valueOf(stepId));
-        result.put("cmd_output", jobService.getExecutableManager().getOutput(stepId).getVerboseMsg());
+        result.put("cmd_output", jobService.getJobOutput(jobId, stepId));
         return result;
     }
 
     /**
+     * Download a job step output from hdfs
+     * @param jobId
+     * @param stepId
+     * @param project
+     * @param response
+     * @return
+     */
+    @RequestMapping(value = "/{job_id:.+}/steps/{step_id:.+}/log", method = { RequestMethod.GET }, produces = { "application/json" })
+    @ResponseBody
+    public EnvelopeResponse<String> downloadLogFile(@PathVariable("job_id") String jobId,
+                                                    @PathVariable("step_id") String stepId, @RequestParam(value = "project") String project,
+                                                    HttpServletResponse response) throws IOException {
+        checkRequiredArg("job_id", jobId);
+        checkRequiredArg("step_id", stepId);
+        checkRequiredArg("project", project);
+        String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", project, stepId);
+
+        String jobOutput = jobService.getAllJobOutput(jobId, stepId);
+        setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes("UTF-8")), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response);
+        return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", "");
+    }
+
+    /**
      * Resume a cube job
      * 
      * @return
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index f7be7d1..2c0c20c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -64,6 +64,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.CheckpointExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
 import org.apache.kylin.metadata.model.ISourceAware;
@@ -479,6 +480,16 @@ public class JobService extends BasicService implements InitializingBean {
         return getExecutableManager().getOutput(id);
     }
 
+    public String getJobOutput(String jobId, String stepId) {
+        ExecutableManager executableManager = getExecutableManager();
+        return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg();
+    }
+
+    public String getAllJobOutput(String jobId, String stepId) {
+        ExecutableManager executableManager = getExecutableManager();
+        return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg();
+    }
+
     protected JobInstance getSingleJobInstance(AbstractExecutable job) {
         Message msg = MsgPicker.getMsg();
 


[kylin] 06/06: KYLIN-4825 Add yarn tracking url on job step page

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 51ff5ec9c6959bd5238c579573494bc4b07c2095
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Wed Nov 25 20:00:08 2020 +0800

    KYLIN-4825 Add yarn tracking url on job step page
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../kylin/job/execution/ExecutableManager.java     |   4 +-
 .../engine/spark/application/SparkApplication.java | 106 +++++++++++++++++++++
 .../kylin/engine/spark/job/NSparkExecutable.java   |  15 +--
 .../kylin/rest/controller/JobController.java       |  18 +++-
 .../kylin/rest/request/SparkJobUpdateRequest.java  |  54 +++++++++++
 .../org/apache/kylin/rest/service/JobService.java  |  13 +++
 server/src/main/resources/kylinSecurity.xml        |   1 +
 8 files changed, 198 insertions(+), 17 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 964eb64..8f2bf8e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2648,6 +2648,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.engine.driver-memory-base", "1024"));
     }
 
+    public boolean isTrackingUrlIpAddressEnabled() {
+        return Boolean.valueOf(this.getOptional("kylin.job.tracking-url-ip-address-enabled", TRUE));
+    }
+
     //Auto adjust the memory of driver
     public int[] getSparkEngineDriverMemoryStrategy() {
         String[] dft = {"2", "20", "100"};
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 42a9c99..0bd7b6e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -672,6 +672,7 @@ public class ExecutableManager {
                             + newStatus + ", job id: " + jobId);
                 }
                 jobOutput.setStatus(newStatus.toString());
+                logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
             }
             if (info != null) {
                 jobOutput.setInfo(info);
@@ -683,7 +684,6 @@ public class ExecutableManager {
                 jobOutput.setContent(output);
             }
             executableDao.updateJobOutput(jobOutput);
-            logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
 
             if (needDestroyProcess(oldStatus, newStatus)) {
                 logger.debug("need kill {}, from {} to {}", jobId, oldStatus, newStatus);
@@ -695,7 +695,7 @@ public class ExecutableManager {
             throw new RuntimeException(e);
         }
 
-        if (project != null) {
+        if (project != null && logPath != null) {
             updateJobOutputToHDFS(project, jobId, output, logPath);
         }
     }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 84737cb..98dde2b 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -18,6 +18,14 @@
 
 package org.apache.kylin.engine.spark.application;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.engine.spark.job.BuildJobInfos;
 import org.apache.kylin.engine.spark.job.KylinBuildEnv;
@@ -26,8 +34,15 @@ import org.apache.kylin.engine.spark.job.UdfManager;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.engine.spark.utils.SparkConfHelper;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -111,6 +126,93 @@ public abstract class SparkApplication {
         return YarnInfoFetcherUtils.getTrackingUrl(yarnAppId);
     }
 
+    /**
+     * http request the spark job controller
+     */
+    public Boolean updateSparkJobInfo(String url, String json) {
+        String serverIp = System.getProperty("spark.driver.rest.server.ip", "127.0.0.1");
+        String port = System.getProperty("spark.driver.rest.server.port", "7070");
+        String requestApi = String.format(Locale.ROOT, "http://%s:%s" + url, serverIp, port);
+
+        try {
+            DefaultHttpClient httpClient = new DefaultHttpClient();
+            HttpPut httpPut = new HttpPut(requestApi);
+            httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
+            httpPut.setEntity(new StringEntity(json, StandardCharsets.UTF_8));
+
+            HttpResponse response = httpClient.execute(httpPut);
+            int code = response.getStatusLine().getStatusCode();
+            if (code == HttpStatus.SC_OK) {
+                return true;
+            } else {
+                InputStream inputStream = response.getEntity().getContent();
+                String responseContent = IOUtils.toString(inputStream);
+                logger.warn("update spark job failed, info: {}", responseContent);
+            }
+        } catch (IOException e) {
+            logger.error("http request {} failed!", requestApi, e);
+        }
+        return false;
+    }
+
+    /**
+     * update spark job extra info, link yarn_application_tracking_url
+     */
+    public Boolean updateSparkJobExtraInfo(String url, String project, String jobId, Map<String, String> extraInfo) {
+        Map<String, String> payload = new HashMap<>(5);
+        payload.put("project", project);
+        payload.put("taskId", System.getProperty("spark.driver.param.taskId", jobId));
+        payload.putAll(extraInfo);
+
+        try {
+            String payloadJson = JsonUtil.writeValueAsString(payload);
+            int retry = 3;
+            for (int i = 0; i < retry; i++) {
+                if (updateSparkJobInfo(url, payloadJson)) {
+                    return Boolean.TRUE;
+                }
+                Thread.sleep(3000);
+                logger.warn("retry request rest api update spark extra job info");
+            }
+        } catch (Exception e) {
+            logger.error("update spark job extra info failed!", e);
+        }
+
+        return Boolean.FALSE;
+    }
+
+    private String tryReplaceHostAddress(String url) {
+        String originHost = null;
+        try {
+            URI uri = URI.create(url);
+            originHost = uri.getHost();
+            String hostAddress = InetAddress.getByName(originHost).getHostAddress();
+            return url.replace(originHost, hostAddress);
+        } catch (UnknownHostException uhe) {
+            logger.error("failed to get the ip address of " + originHost + ", step back to use the origin tracking url.", uhe);
+            return url;
+        }
+    }
+
+    private Map<String, String> getTrackingInfo(boolean ipAddressPreferred) {
+        String applicationId = ss.sparkContext().applicationId();
+        Map<String, String> extraInfo = new HashMap<>();
+        try {
+            String trackingUrl = getTrackingUrl(applicationId);
+            if (StringUtils.isBlank(trackingUrl)) {
+                logger.warn("Get tracking url of application {}, but empty url found.", applicationId);
+                return extraInfo;
+            }
+            if (ipAddressPreferred) {
+                trackingUrl = tryReplaceHostAddress(trackingUrl);
+            }
+            extraInfo.put("yarnAppUrl", trackingUrl);
+        } catch (Exception e) {
+            logger.error("get tracking url failed!", e);
+        }
+        return extraInfo;
+    }
+
 
     final protected void execute() throws Exception {
         String hdfsMetalUrl = getParam(MetadataConstants.P_DIST_META_URL);
@@ -180,6 +282,10 @@ public abstract class SparkApplication {
             }).enableHiveSupport().config(sparkConf).config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
                     .getOrCreate();
 
+            if (isJobOnCluster(sparkConf)) {
+                updateSparkJobExtraInfo("/kylin/api/jobs/spark", project, jobId,
+                        getTrackingInfo(config.isTrackingUrlIpAddressEnabled()));
+            }
             // for spark metrics
             //JobMetricsUtils.registerListener(ss);
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 7cd178c..393c10d 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -59,7 +59,6 @@ import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.job.common.PatternedLogger;
-import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -254,24 +253,12 @@ public class NSparkExecutable extends AbstractExecutable {
                                          String kylinJobJar, String appArgs, String jobId) {
         PatternedLogger patternedLogger;
         if (config.isJobLogPrintEnabled()) {
-            patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
-                @Override
-                public void onLogEvent(String infoKey, Map<String, String> info) {
-                    // only care three properties here
-                    if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey)
-                            || ExecutableConstants.YARN_APP_ID.equals(infoKey)
-                            || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
-                        getManager().addJobInfo(getId(), info);
-                    }
-                }
-            });
+            patternedLogger = new PatternedLogger(logger);
         } else {
             patternedLogger = new PatternedLogger(null);
         }
         try {
             String cmd = generateSparkCmd(config, hadoopConf, jars, kylinJobJar, appArgs);
-            patternedLogger.log("cmd: ");
-            patternedLogger.log(cmd);
 
             CliCommandExecutor exec = new CliCommandExecutor();
             exec.execute(cmd, patternedLogger, jobId);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index cd63ac7..6b60326 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -34,6 +34,7 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.JobListRequest;
+import org.apache.kylin.rest.request.SparkJobUpdateRequest;
 import org.apache.kylin.rest.response.EnvelopeResponse;
 import org.apache.kylin.rest.response.ResponseCode;
 import org.apache.kylin.rest.service.JobService;
@@ -45,9 +46,11 @@ import org.springframework.http.MediaType;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
-import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.PutMapping;
 
 import javax.servlet.http.HttpServletResponse;
 
@@ -199,6 +202,19 @@ public class JobController extends BasicController {
     }
 
     /**
+     * RPC Call
+     */
+    @PutMapping(value = "/spark")
+    @ResponseBody
+    public EnvelopeResponse<String> updateSparkJobInfo(@RequestBody SparkJobUpdateRequest sparkJobUpdateRequest) {
+        jobService.updateSparkJobInfo(sparkJobUpdateRequest.getProject(),
+                sparkJobUpdateRequest.getTaskId(),
+                sparkJobUpdateRequest.getYarnAppUrl());
+
+        return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", "");
+    }
+
+    /**
      * Resume a cube job
      * 
      * @return
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java
new file mode 100644
index 0000000..ce4147f
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.request;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class SparkJobUpdateRequest {
+    @JsonProperty
+    private String project;
+    @JsonProperty
+    private String taskId;
+    @JsonProperty
+    private String yarnAppUrl;
+
+    public String getProject() {
+        return project;
+    }
+
+    public void setProject(String project) {
+        this.project = project;
+    }
+
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(String taskId) {
+        this.taskId = taskId;
+    }
+
+    public String getYarnAppUrl() {
+        return yarnAppUrl;
+    }
+
+    public void setYarnAppUrl(String yarnAppUrl) {
+        this.yarnAppUrl = yarnAppUrl;
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 90ee782..e543c22 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -33,6 +33,7 @@ import java.util.TimeZone;
 
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLock;
@@ -54,6 +55,7 @@ import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.JobSearchResult;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.SchedulerFactory;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.job.dao.ExecutableOutputPO;
@@ -467,6 +469,17 @@ public class JobService extends BasicService implements InitializingBean {
         }
     }
 
+    /**
+     * update the spark job yarnAppUrl.
+     */
+    public void updateSparkJobInfo(String project, String taskId, String yarnAppUrl) {
+        ExecutableManager executableManager = getExecutableManager();
+        Map<String, String> extraInfo = Maps.newHashMap();
+        extraInfo.put(ExecutableConstants.YARN_APP_URL, yarnAppUrl);
+
+        executableManager.updateJobOutput(project, taskId, null, extraInfo, null, null);
+    }
+
     public JobInstance getJobInstance(String uuid) {
         AbstractExecutable job = getExecutableManager().getJob(uuid);
         if (job instanceof CheckpointExecutable) {
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index 0003a5f..f73c595 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -249,6 +249,7 @@
             <scr:intercept-url pattern="/api/cubes*/**" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/models*/**" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/streaming*/**" access="isAuthenticated()"/>
+            <scr:intercept-url pattern="/api/jobs/spark" access="permitAll"/>
             <scr:intercept-url pattern="/api/job*/**" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/admin/public_config" access="permitAll"/>
             <scr:intercept-url pattern="/api/admin/version" access="permitAll"/>


[kylin] 04/06: KYLIN-4813 Add download all log link in front-end

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c4d3168268f189aaec4e7406d08dcc10fffb2888
Author: zl03 <zh...@163.com>
AuthorDate: Mon Nov 23 15:37:54 2020 +0800

    KYLIN-4813 Add download all log link in front-end
---
 .../engine/spark/common/logging/SparkExecutorHdfsAppender.java |  4 ++--
 webapp/app/js/controllers/job.js                               | 10 +++++++++-
 webapp/app/partials/jobs/job_steps.html                        |  1 +
 3 files changed, 12 insertions(+), 3 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
index 381fd2d..d8b250f 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -42,8 +42,8 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
 
     private static final long A_DAY_MILLIS = 24 * 60 * 60 * 1000L;
     private static final long A_HOUR_MILLIS = 60 * 60 * 1000L;
-    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.getDefault());
-    private SimpleDateFormat hourFormat = new SimpleDateFormat("HH");
+    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.ROOT);
+    private SimpleDateFormat hourFormat = new SimpleDateFormat("HH", Locale.ROOT);
 
     @VisibleForTesting
     String outPutPath;
diff --git a/webapp/app/js/controllers/job.js b/webapp/app/js/controllers/job.js
index a2fed31..a36a049 100644
--- a/webapp/app/js/controllers/job.js
+++ b/webapp/app/js/controllers/job.js
@@ -354,6 +354,9 @@ KylinApp
                     },
                     attr: function () {
                         return $scope.state.stepAttrToShow;
+                    },
+                    job: function () {
+                        return $scope.state.selectedJob;
                     }
                 }
             });
@@ -361,10 +364,15 @@ KylinApp
     }
 );
 
-var jobStepDetail = function ($scope, $modalInstance, step, attr) {
+var jobStepDetail = function ($scope, $modalInstance, $window, step, attr, job) {
     $scope.step = step;
     $scope.stepAttrToShow = attr;
+    $scope.job = job;
     $scope.cancel = function () {
         $modalInstance.dismiss('cancel');
     }
+    $scope.downloadAllLogs =function () {
+        var downloadUrl = Config.service.url + 'jobs/'+ job.uuid +'/steps/'+ step.id +'/log' + '?project=' + job.projectName;
+        $window.open(downloadUrl);
+    }
 };
diff --git a/webapp/app/partials/jobs/job_steps.html b/webapp/app/partials/jobs/job_steps.html
index 993b302..13516c1 100644
--- a/webapp/app/partials/jobs/job_steps.html
+++ b/webapp/app/partials/jobs/job_steps.html
@@ -161,6 +161,7 @@
     <div class="modal-header">
         <h4>{{stepAttrToShow == 'cmd' ? 'Parameters' : 'Output'}}
             <i ng-if="stepAttrToShow == 'output' && step.loadingOp" class="fa fa-spinner fa-spin"></i></h4>
+            The output log shows the first and last 100 lines by default. To view all the output, please click to <a ng-click="downloadAllLogs()">download the log file</a >.
     </div>
     <div class="modal-body">
         <pre>{{stepAttrToShow == 'cmd' ? step.exec_cmd : (step.loadingOp ? "Loading..." : !!(step.cmd_output)?step.cmd_output:'No Data Available')}}</pre>