You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/11/27 07:22:48 UTC

[kylin] 01/06: KYLIN-4813 Refactor spark build log

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 2dca9ef9a52cf1adb12684e0ba576b91e96b3792
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Thu Nov 12 17:50:47 2020 +0800

    KYLIN-4813 Refactor spark build log
---
 .../engine/mr/common/MapReduceExecutable.java      |   7 +-
 build/bin/kylin-port-replace-util.sh               |   1 +
 build/conf/kylin-parquet-log4j.properties          |  33 --
 build/conf/kylin-spark-log4j.properties            |  43 ---
 build/conf/spark-driver-log4j.properties           |  45 +++
 .../org/apache/kylin/common/KylinConfigBase.java   |  26 +-
 .../common/logging/SensitivePatternLayout.java     |  62 ++++
 .../src/main/resources/kylin-defaults.properties   |   3 +
 .../apache/kylin/job/dao/ExecutableOutputPO.java   |  11 +
 .../kylin/job/execution/AbstractExecutable.java    |  19 +-
 .../job/execution/DefaultChainedExecutable.java    |  18 +-
 .../kylin/job/execution/ExecutableManager.java     | 263 +++++++++++++-
 .../apache/kylin/job/ExecutableManagerTest.java    |  20 +-
 .../job/impl/threadpool/DefaultSchedulerTest.java  |   4 +-
 .../common/logging/AbstractHdfsLogAppender.java    | 377 +++++++++++++++++++++
 .../common/logging/SparkDriverHdfsLogAppender.java | 112 ++++++
 .../kylin/engine/spark/job/NSparkExecutable.java   |  81 +++--
 .../kylin/rest/controller/BasicController.java     |  16 +
 .../kylin/rest/controller/JobController.java       |  34 +-
 .../org/apache/kylin/rest/service/JobService.java  |  11 +
 20 files changed, 1044 insertions(+), 142 deletions(-)

diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index a33f171..4978fa0 100755
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -57,6 +57,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,7 +86,7 @@ public class MapReduceExecutable extends AbstractExecutable {
         if (output.getExtra().containsKey(START_TIME)) {
             final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
             if (mrJobId == null) {
-                getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath());
                 return;
             }
             try {
@@ -96,7 +97,7 @@ public class MapReduceExecutable extends AbstractExecutable {
                     //remove previous mr job info
                     super.onExecuteStart(executableContext);
                 } else {
-                    getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+                    getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath());
                 }
             } catch (IOException | ParseException e) {
                 logger.warn("error get hadoop status");
@@ -180,7 +181,7 @@ public class MapReduceExecutable extends AbstractExecutable {
 
                 JobStepStatusEnum newStatus = HadoopJobStatusChecker.checkStatus(job, output);
                 if (status == JobStepStatusEnum.KILLED) {
-                    mgr.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin");
+                    mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin", null);
                     if (isDiscarded()) {
                         if (getIsNeedLock()) {
                             releaseLock(lock);
diff --git a/build/bin/kylin-port-replace-util.sh b/build/bin/kylin-port-replace-util.sh
index ea96791..12a2abd 100755
--- a/build/bin/kylin-port-replace-util.sh
+++ b/build/bin/kylin-port-replace-util.sh
@@ -96,6 +96,7 @@ then
     sed -i "s/^kylin\.stream\.node=.*$/$stream_node/g" ${KYLIN_CONFIG_FILE}
 
     sed -i "s/#*kylin.server.cluster-servers=\(.*\).*:\(.*\)/kylin.server.cluster-servers=\1:${new_kylin_port}/g" ${KYLIN_CONFIG_FILE}
+    sed -i "s/#*server.port=.*$/server.port=${new_kylin_port}/g" ${KYLIN_CONFIG_FILE}
 
     echo "New kylin port is : ${new_kylin_port}"
 
diff --git a/build/conf/kylin-parquet-log4j.properties b/build/conf/kylin-parquet-log4j.properties
deleted file mode 100644
index 36b7dd4..0000000
--- a/build/conf/kylin-parquet-log4j.properties
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#  
-#     http://www.apache.org/licenses/LICENSE-2.0
-#  
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-
-#overall config
-log4j.rootLogger=WARN,stdout
-log4j.logger.org.apache.kylin=DEBUG
-log4j.logger.org.springframework=WARN
-log4j.logger.org.springframework.security=WARN
-log4j.logger.org.apache.spark=WARN
-# For the purpose of getting Tracking URL
-log4j.logger.org.apache.spark.deploy.yarn=INFO
-log4j.logger.org.apache.spark.ContextCleaner=WARN
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.err
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
\ No newline at end of file
diff --git a/build/conf/kylin-spark-log4j.properties b/build/conf/kylin-spark-log4j.properties
deleted file mode 100644
index 948fb32..0000000
--- a/build/conf/kylin-spark-log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-log4j.rootCategory=WARN,stderr,stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
-
-log4j.appender.stderr=org.apache.log4j.ConsoleAppender
-log4j.appender.stderr.Target=System.err
-log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
-log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
-
-
-# Settings to quiet third party logs that are too verbose
-log4j.logger.org.spark-project.jetty=WARN
-log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
-log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
-log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
-log4j.logger.org.apache.parquet=ERROR
-log4j.logger.parquet=ERROR
-
-# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
-log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
-log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
-log4j.logger.org.apache.spark.sql=WARN
-
-log4j.logger.org.apache.kylin=DEBUG
\ No newline at end of file
diff --git a/build/conf/spark-driver-log4j.properties b/build/conf/spark-driver-log4j.properties
new file mode 100644
index 0000000..8b0e82d
--- /dev/null
+++ b/build/conf/spark-driver-log4j.properties
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#overall config
+log4j.rootLogger=INFO,hdfs
+log4j.logger.org.apache.kylin=DEBUG
+log4j.logger.org.springframework=WARN
+log4j.logger.org.springframework.security=WARN
+log4j.logger.org.apache.spark=WARN
+log4j.logger.org.apache.spark.ContextCleaner=WARN
+
+# hdfs file appender
+log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkDriverHdfsLogAppender
+log4j.appender.hdfs.kerberosEnable=${kylin.kerberos.enabled}
+log4j.appender.hdfs.kerberosPrincipal=${kylin.kerberos.principal}
+log4j.appender.hdfs.kerberosKeytab=${kylin.kerberos.keytab}
+log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir}
+log4j.appender.hdfs.logPath=${spark.driver.log4j.appender.hdfs.File}
+log4j.appender.hdfs.logQueueCapacity=5000
+#flushPeriod count as millis
+log4j.appender.hdfs.flushInterval=5000
+log4j.appender.hdfs.layout=org.apache.kylin.common.logging.SensitivePatternLayout
+#Don't add line number (%L) as it's too costly!
+log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
+
+log4j.appender.logFile=org.apache.log4j.FileAppender
+log4j.appender.logFile.Threshold=DEBUG
+log4j.appender.logFile.File=${spark.driver.local.logDir}/${spark.driver.param.taskId}.log
+log4j.appender.logFile.layout=org.apache.kylin.common.logging.SensitivePatternLayout
+log4j.appender.logFile.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 8e53a9b..081d23f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -258,6 +258,8 @@ public abstract class KylinConfigBase implements Serializable {
 
     final protected void reloadKylinConfig(Properties properties) {
         this.properties = BCC.check(properties);
+        setProperty("kylin.metadata.url.identifier", getMetadataUrlPrefix());
+        setProperty("kylin.log.spark-driver-properties-file", getLogSparkDriverPropertiesFile());
     }
 
     private Map<Integer, String> convertKeyToInteger(Map<String, String> map) {
@@ -465,6 +467,10 @@ public abstract class KylinConfigBase implements Serializable {
         return getMetadataUrl().getIdentifier();
     }
 
+    public String getServerPort() {
+        return getOptional("server.port", "7070");
+    }
+
     public Map<String, String> getResourceStoreImpls() {
         Map<String, String> r = Maps.newLinkedHashMap();
         // ref constants in ISourceAware
@@ -866,6 +872,14 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("kylin.job.log-dir", "/tmp/kylin/logs");
     }
 
+    public String getKylinLogDir() {
+        String kylinHome = getKylinHome();
+        if (kylinHome == null) {
+            kylinHome = System.getProperty("KYLIN_HOME");
+        }
+        return kylinHome + File.separator + "logs";
+    }
+
     public boolean getRunAsRemoteCommand() {
         return Boolean.parseBoolean(getOptional("kylin.job.use-remote-cli"));
     }
@@ -2664,6 +2678,10 @@ public abstract class KylinConfigBase implements Serializable {
         return getHdfsWorkingDirectory() + project + "/job_tmp/";
     }
 
+    public String getSparkLogDir(String project) {
+        return getHdfsWorkingDirectory() + project + "/spark_logs/driver/";
+    }
+
     @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
     public int getPersistFlatTableThreshold() {
         return Integer.parseInt(getOptional("kylin.engine.persist-flattable-threshold", "1"));
@@ -2807,8 +2825,8 @@ public abstract class KylinConfigBase implements Serializable {
         return Boolean.parseBoolean(getOptional("kylin.query.spark-engine.enabled", "true"));
     }
 
-    public String getLogSparkPropertiesFile() {
-        return getLogPropertyFile("kylin-parquet-log4j.properties");
+    public String getLogSparkDriverPropertiesFile() {
+        return getLogPropertyFile("spark-driver-log4j.properties");
     }
 
     private String getLogPropertyFile(String filename) {
@@ -2875,6 +2893,10 @@ public abstract class KylinConfigBase implements Serializable {
                 .parseBoolean(this.getOptional("kylin.query.pushdown.auto-set-shuffle-partitions-enabled", "true"));
     }
 
+    public String getJobOutputStorePath(String project, String jobId) {
+        return getSparkLogDir(project) + getNestedPath(jobId) + "execute_output.json";
+    }
+
     public int getBaseShufflePartitionSize() {
         return Integer.parseInt(this.getOptional("kylin.query.pushdown.base-shuffle-partition-size", "48"));
     }
diff --git a/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java b/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java
new file mode 100644
index 0000000..013d42d
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.logging;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SensitivePatternLayout extends PatternLayout {
+    private static final String PREFIX_GROUP_NAME = "prefix";
+    private static final String SENSITIVE_GROUP_NAME = "sensitive";
+    private static final String MASK = "******";
+    private static final Pattern SENSITIVE_PATTERN = Pattern.compile(
+            String.format(Locale.ROOT, "(?<%s>password\\s*[:=])(?<%s>[^,.!]*)", PREFIX_GROUP_NAME, SENSITIVE_GROUP_NAME),
+            Pattern.CASE_INSENSITIVE);
+
+    @Override
+    public String format(LoggingEvent event) {
+        if (event.getMessage() instanceof String) {
+            String maskedMessage = mask(event.getRenderedMessage());
+
+            Throwable throwable = event.getThrowableInformation() != null
+                    ? event.getThrowableInformation().getThrowable()
+                    : null;
+            LoggingEvent maskedEvent = new LoggingEvent(event.fqnOfCategoryClass,
+                    Logger.getLogger(event.getLoggerName()), event.timeStamp, event.getLevel(), maskedMessage,
+                    throwable);
+
+            return super.format(maskedEvent);
+        }
+        return super.format(event);
+    }
+
+    private String mask(String message) {
+        Matcher matcher = SENSITIVE_PATTERN.matcher(message);
+        if (matcher.find()) {
+            return matcher.replaceAll(String.format(Locale.ROOT, "${%s}%s", PREFIX_GROUP_NAME, MASK));
+        }
+        return message;
+    }
+}
+
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index eb8398b..3d8268f 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -44,6 +44,9 @@ kylin.env.zookeeper-connect-string=sandbox.hortonworks.com
 # Kylin server mode, valid value [all, query, job]
 kylin.server.mode=all
 
+## Kylin server port
+server.port=7070
+
 # List of web servers in use, this enables one web server instance to sync up with other servers.
 kylin.server.cluster-servers=localhost:7070
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
index fdc6cc4..27a2831 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
@@ -38,6 +38,9 @@ public class ExecutableOutputPO extends RootPersistentEntity {
     @JsonProperty("status")
     private String status = "READY";
 
+    @JsonProperty("log_path")
+    private String logPath;
+
     @JsonProperty("info")
     private Map<String, String> info = Maps.newHashMap();
 
@@ -64,4 +67,12 @@ public class ExecutableOutputPO extends RootPersistentEntity {
     public void setInfo(Map<String, String> info) {
         this.info = info;
     }
+
+    public void setLogPath(String logPath) {
+        this.logPath = logPath;
+    }
+
+    public String getLogPath() {
+        return logPath;
+    }
 }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 202d22e..d815e5b 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -82,6 +82,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     private Map<String, String> params = Maps.newHashMap();
     protected Integer priority;
     private CubeBuildTypeEnum jobType;
+    private String logPath;
     protected String project;
     private String targetSubject;
     private List<String> targetSegments = Lists.newArrayList();//uuid of related segments
@@ -95,6 +96,14 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         this.config = config;
     }
 
+    public void setLogPath(String logPath) {
+        this.logPath = logPath;
+    }
+
+    public String getLogPath() {
+        return logPath;
+    }
+
     protected KylinConfig getConfig() {
         return config;
     }
@@ -107,7 +116,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         checkJobPaused();
         Map<String, String> info = Maps.newHashMap();
         info.put(START_TIME, Long.toString(System.currentTimeMillis()));
-        getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
+        getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, info, null, getLogPath());
     }
 
     public KylinConfig getCubeSpecificConfig() {
@@ -151,9 +160,9 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         setEndTime(System.currentTimeMillis());
         if (!isDiscarded() && !isRunnable()) {
             if (result.succeed()) {
-                getManager().updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
+                getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.SUCCEED, null, result.output(), getLogPath());
             } else {
-                getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
+                getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, result.output(), getLogPath());
             }
         }
     }
@@ -168,7 +177,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
                 exception.printStackTrace(new PrintWriter(out));
                 output = out.toString();
             }
-            getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, output);
+            getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, output, getLogPath());
         }
     }
 
@@ -587,7 +596,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
 
     public final String getProject() {
         if (project == null) {
-            throw new IllegalStateException("project is not set for abstract executable " + getId());
+            logger.error("project is not set for abstract executable " + getId());
         }
         return project;
     }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index bbeaf6a..c0f6d87 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -101,11 +101,11 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
     protected void onExecuteStart(ExecutableContext executableContext) {
         final long startTime = getStartTime();
         if (startTime > 0) {
-            getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+            getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath());
         } else {
             Map<String, String> info = Maps.newHashMap();
             info.put(START_TIME, Long.toString(System.currentTimeMillis()));
-            getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
+            getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, info, null, getLogPath());
         }
         getManager().addJobInfo(getId(), BUILD_INSTANCE, DistributedLockFactory.processAndHost());
     }
@@ -137,8 +137,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
                             "There shouldn't be a running subtask[jobId: {}, jobName: {}], \n"
                                     + "it might cause endless state, will retry to fetch subtask's state.",
                             task.getId(), task.getName());
-                    getManager().updateJobOutput(task.getId(), ExecutableState.ERROR, null,
-                            "killed due to inconsistent state");
+                    getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.ERROR, null,
+                            "killed due to inconsistent state", getLogPath());
                     hasError = true;
                 }
 
@@ -156,21 +156,21 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
             }
             if (allSucceed) {
                 setEndTime(System.currentTimeMillis());
-                mgr.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null);
+                mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.SUCCEED, null, null, getLogPath());
                 onStatusChange(executableContext, result, ExecutableState.SUCCEED);
             } else if (hasError) {
                 setEndTime(System.currentTimeMillis());
-                mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
+                mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, null, getLogPath());
                 onStatusChange(executableContext, result, ExecutableState.ERROR);
             } else if (hasDiscarded) {
                 setEndTime(System.currentTimeMillis());
-                mgr.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null);
+                mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.DISCARDED, null, null, getLogPath());
             } else {
-                mgr.updateJobOutput(getId(), ExecutableState.READY, null, null);
+                mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.READY, null, null, getLogPath());
             }
         } else {
             setEndTime(System.currentTimeMillis());
-            mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
+            mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, result.output(), getLogPath());
             onStatusChange(executableContext, result, ExecutableState.ERROR);
         }
     }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index ecc2c2e..c9ae8a4 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -23,7 +23,12 @@ import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID;
 import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;
 import static org.apache.kylin.job.constant.ExecutableConstants.FLINK_JOB_ID;
 
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.IllegalFormatException;
@@ -31,18 +36,28 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Deque;
+import java.util.ArrayDeque;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.JobProcessContext;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.dao.ExecutableDao;
 import org.apache.kylin.job.dao.ExecutableOutputPO;
 import org.apache.kylin.job.dao.ExecutablePO;
 import org.apache.kylin.job.exception.IllegalStateTranferException;
 import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -240,6 +255,197 @@ public class ExecutableManager {
         return result;
     }
 
+    public Output getOutputFromHDFSByJobId(String jobId) {
+        return getOutputFromHDFSByJobId(jobId, jobId);
+    }
+
+    public Output getOutputFromHDFSByJobId(String jobId, String stepId) {
+        return getOutputFromHDFSByJobId(jobId, stepId, 100);
+    }
+
+    /**
+     * get job output from hdfs json file;
+     * if json file contains logPath,
+     * the logPath is spark driver log hdfs path(*.json.log), read sample data from log file.
+     *
+     * @param jobId
+     * @return
+     */
+    public Output getOutputFromHDFSByJobId(String jobId, String stepId, int nLines) {
+        AbstractExecutable jobInstance = getJob(jobId);
+        String outputStorePath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(jobInstance.getParam(MetadataConstants.P_PROJECT_NAME), stepId);
+        ExecutableOutputPO jobOutput = getJobOutputFromHDFS(outputStorePath);
+        assertOutputNotNull(jobOutput, outputStorePath);
+
+        if (Objects.nonNull(jobOutput.getLogPath())) {
+            if (isHdfsPathExists(jobOutput.getLogPath())) {
+                jobOutput.setContent(getSampleDataFromHDFS(jobOutput.getLogPath(), nLines));
+            } else if (StringUtils.isEmpty(jobOutput.getContent()) && Objects.nonNull(getJob(jobId))
+                    && getJob(jobId).getStatus() == ExecutableState.RUNNING) {
+                jobOutput.setContent("Wait a moment ... ");
+            }
+        }
+
+        return parseOutput(jobOutput);
+    }
+
+    public ExecutableOutputPO getJobOutputFromHDFS(String resPath) {
+        DataInputStream din = null;
+        try {
+            Path path = new Path(resPath);
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            if (!fs.exists(path)) {
+                ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
+                executableOutputPO.setContent("job output not found, please check kylin.log");
+                return executableOutputPO;
+            }
+
+            din = fs.open(path);
+            return JsonUtil.readValue(din, ExecutableOutputPO.class);
+        } catch (Exception e) {
+            // If the output file on hdfs is corrupt, give an empty output
+            logger.error("get job output [{}] from HDFS failed.", resPath, e);
+            ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
+            executableOutputPO.setContent("job output broken, please check kylin.log");
+            return executableOutputPO;
+        } finally {
+            IOUtils.closeQuietly(din);
+        }
+    }
+
+    private void assertOutputNotNull(ExecutableOutputPO output, String idOrPath) {
+        com.google.common.base.Preconditions.checkArgument(output != null, "there is no related output for job :" + idOrPath);
+    }
+
+    /**
+     * check the hdfs path exists.
+     *
+     * @param hdfsPath
+     * @return
+     */
+    public boolean isHdfsPathExists(String hdfsPath) {
+        if (StringUtils.isBlank(hdfsPath)) {
+            return false;
+        }
+
+        Path path = new Path(hdfsPath);
+        FileSystem fs = HadoopUtil.getWorkingFileSystem();
+        try {
+            return fs.exists(path);
+        } catch (IOException e) {
+            logger.error("check the hdfs path [{}] exists failed, ", hdfsPath, e);
+        }
+
+        return false;
+    }
+
+    /**
+     * get sample data from hdfs log file.
+     * specified the lines, will get the first num lines and last num lines.
+     *
+     * @param resPath
+     * @return
+     */
+    public String getSampleDataFromHDFS(String resPath, final int nLines) {
+        try {
+            Path path = new Path(resPath);
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            if (!fs.exists(path)) {
+                return null;
+            }
+
+            FileStatus fileStatus = fs.getFileStatus(path);
+            try (FSDataInputStream din = fs.open(path);
+                 BufferedReader reader = new BufferedReader(new InputStreamReader(din, "UTF-8"))) {
+
+                String line;
+                StringBuilder sampleData = new StringBuilder();
+                for (int i = 0; i < nLines && (line = reader.readLine()) != null; i++) {
+                    if (sampleData.length() > 0) {
+                        sampleData.append('\n');
+                    }
+                    sampleData.append(line);
+                }
+
+                int offset = sampleData.toString().getBytes("UTF-8").length + 1;
+                if (offset < fileStatus.getLen()) {
+                    sampleData.append("\n================================================================\n");
+                    sampleData.append(tailHdfsFileInputStream(din, offset, fileStatus.getLen(), nLines));
+                }
+                return sampleData.toString();
+            }
+        } catch (IOException e) {
+            logger.error("get sample data from hdfs log file [{}] failed!", resPath, e);
+            return null;
+        }
+    }
+
+    /**
+     * get the last N_LINES lines from the end of hdfs file input stream;
+     * reference: https://olapio.atlassian.net/wiki/spaces/PD/pages/1306918958
+     *
+     * @param hdfsDin
+     * @param startPos
+     * @param endPos
+     * @param nLines
+     * @return
+     * @throws IOException
+     */
+    private String tailHdfsFileInputStream(FSDataInputStream hdfsDin, final long startPos, final long endPos,
+                                           final int nLines) throws IOException {
+        com.google.common.base.Preconditions.checkNotNull(hdfsDin);
+        com.google.common.base.Preconditions.checkArgument(startPos < endPos && startPos >= 0);
+        com.google.common.base.Preconditions.checkArgument(nLines >= 0);
+
+        Deque<String> deque = new ArrayDeque<>();
+        int buffSize = 8192;
+        byte[] byteBuf = new byte[buffSize];
+
+        long pos = endPos;
+
+        // cause by log last char is \n
+        hdfsDin.seek(pos - 1);
+        int lastChar = hdfsDin.read();
+        if ('\n' == lastChar) {
+            pos--;
+        }
+
+        int bytesRead = (int) ((pos - startPos) % buffSize);
+        if (bytesRead == 0) {
+            bytesRead = buffSize;
+        }
+
+        pos -= bytesRead;
+        int lines = nLines;
+        while (lines > 0 && pos >= startPos) {
+            bytesRead = hdfsDin.read(pos, byteBuf, 0, bytesRead);
+
+            int last = bytesRead;
+            for (int i = bytesRead - 1; i >= 0 && lines > 0; i--) {
+                if (byteBuf[i] == '\n') {
+                    deque.push(new String(byteBuf, i, last - i, StandardCharsets.UTF_8));
+                    lines--;
+                    last = i;
+                }
+            }
+
+            if (lines > 0 && last > 0) {
+                deque.push(new String(byteBuf, 0, last, StandardCharsets.UTF_8));
+            }
+
+            bytesRead = buffSize;
+            pos -= bytesRead;
+        }
+
+        StringBuilder sb = new StringBuilder();
+        while (!deque.isEmpty()) {
+            sb.append(deque.pop());
+        }
+
+        return sb.length() > 0 && sb.charAt(0) == '\n' ? sb.substring(1) : sb.toString();
+    }
+
+
     public List<AbstractExecutable> getAllExecutables() {
         try {
             List<AbstractExecutable> ret = Lists.newArrayList();
@@ -342,12 +548,12 @@ public class ExecutableManager {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
                 if (task.getStatus() == ExecutableState.RUNNING) {
-                    updateJobOutput(task.getId(), ExecutableState.READY, null, null);
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, null, null, task.getLogPath());
                     break;
                 }
             }
         }
-        updateJobOutput(jobId, ExecutableState.READY, null, null);
+        updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.READY, null, null, job.getLogPath());
     }
 
     public void resumeJob(String jobId) {
@@ -360,7 +566,7 @@ public class ExecutableManager {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
                 if (task.getStatus() == ExecutableState.ERROR || task.getStatus() == ExecutableState.STOPPED) {
-                    updateJobOutput(task.getId(), ExecutableState.READY, null, "no output");
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, null, "no output", task.getLogPath());
                     break;
                 }
             }
@@ -372,7 +578,7 @@ public class ExecutableManager {
                 info.remove(AbstractExecutable.END_TIME);
             }
         }
-        updateJobOutput(jobId, ExecutableState.READY, info, null);
+        updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.READY, info, null, job.getLogPath());
     }
 
     public void discardJob(String jobId) {
@@ -394,11 +600,11 @@ public class ExecutableManager {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
                 if (!task.getStatus().isFinalState()) {
-                    updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null);
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.DISCARDED, null, null, task.getLogPath());
                 }
             }
         }
-        updateJobOutput(jobId, ExecutableState.DISCARDED, null, null);
+        updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.DISCARDED, null, null, job.getLogPath());
     }
 
     public void rollbackJob(String jobId, String stepId) {
@@ -412,13 +618,13 @@ public class ExecutableManager {
             for (AbstractExecutable task : tasks) {
                 if (task.getId().compareTo(stepId) >= 0) {
                     logger.debug("rollback task : " + task);
-                    updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "");
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", task.getLogPath());
                 }
             }
         }
 
         if (job.getStatus() == ExecutableState.SUCCEED) {
-            updateJobOutput(job.getId(), ExecutableState.READY, null, null);
+            updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), job.getId(), ExecutableState.READY, null, null, job.getLogPath());
         }
     }
 
@@ -439,12 +645,12 @@ public class ExecutableManager {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
                 if (!task.getStatus().isFinalState()) {
-                    updateJobOutput(task.getId(), ExecutableState.STOPPED, null, null);
+                    updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.STOPPED, null, null, task.getLogPath());
                     break;
                 }
             }
         }
-        updateJobOutput(jobId, ExecutableState.STOPPED, null, null);
+        updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.STOPPED, null, null, job.getLogPath());
     }
 
     public ExecutableOutputPO getJobOutput(String jobId) {
@@ -456,7 +662,7 @@ public class ExecutableManager {
         }
     }
 
-    public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) {
+    public void updateJobOutput(String project, String jobId, ExecutableState newStatus, Map<String, String> info, String output, String logPath) {
         // when 
         if (Thread.currentThread().isInterrupted()) {
             throw new RuntimeException("Current thread is interruptted, aborting");
@@ -494,6 +700,39 @@ public class ExecutableManager {
             logger.error("error change job:" + jobId + " to " + newStatus);
             throw new RuntimeException(e);
         }
+
+        if (project != null) {
+            //write output to HDFS
+            updateJobOutputToHDFS(project, jobId, output, logPath);
+        }
+    }
+
+    public void updateJobOutputToHDFS(String project, String jobId, String output, String logPath) {
+        ExecutableOutputPO jobOutput = getJobOutput(jobId);
+        if (null != output) {
+            jobOutput.setContent(output);
+        }
+        if (null != logPath) {
+            jobOutput.setLogPath(logPath);
+        }
+        String outputHDFSPath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(project, jobId);
+
+        updateJobOutputToHDFS(outputHDFSPath, jobOutput);
+    }
+
+    public void updateJobOutputToHDFS(String resPath, ExecutableOutputPO obj) {
+        DataOutputStream dout = null;
+        try {
+            Path path = new Path(resPath);
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            dout = fs.create(path, true);
+            JsonUtil.writeValue(dout, obj);
+        } catch (Exception e) {
+            // the operation to update output to hdfs failed, next task should not be interrupted.
+            logger.error("update job output [{}] to HDFS failed.", resPath, e);
+        } finally {
+            IOUtils.closeQuietly(dout);
+        }
     }
 
     private boolean needDestroyProcess(ExecutableState from, ExecutableState to) {
@@ -548,7 +787,7 @@ public class ExecutableManager {
                 if (executableDao.getJobOutput(task.getId()).getStatus().equals("SUCCEED")) {
                     continue;
                 } else if (executableDao.getJobOutput(task.getId()).getStatus().equals("RUNNING")) {
-                    updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "");
+                    updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", null);
                 }
                 break;
             }
diff --git a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
index 3535d37..3341dff 100644
--- a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
@@ -75,7 +75,7 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase {
         AbstractExecutable another = service.getJob(executable.getId());
         assertJobEqual(executable, another);
 
-        service.updateJobOutput(executable.getId(), ExecutableState.RUNNING, null, "test output");
+        service.updateJobOutput(null, executable.getId(), ExecutableState.RUNNING, null, "test output", null);
         assertJobEqual(executable, service.getJob(executable.getId()));
     }
 
@@ -98,21 +98,21 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase {
         SucceedTestExecutable job = new SucceedTestExecutable();
         String id = job.getId();
         service.addJob(job);
-        service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
-        service.updateJobOutput(id, ExecutableState.ERROR, null, null);
-        service.updateJobOutput(id, ExecutableState.READY, null, null);
-        service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
-        service.updateJobOutput(id, ExecutableState.READY, null, null);
-        service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
-        service.updateJobOutput(id, ExecutableState.SUCCEED, null, null);
+        service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.ERROR, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.READY, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.READY, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null);
+        service.updateJobOutput(null, id, ExecutableState.SUCCEED, null, null, null);
     }
 
     @Test(expected = IllegalStateTranferException.class)
     public void testInvalidStateTransfer() {
         SucceedTestExecutable job = new SucceedTestExecutable();
         service.addJob(job);
-        service.updateJobOutput(job.getId(), ExecutableState.ERROR, null, null);
-        service.updateJobOutput(job.getId(), ExecutableState.STOPPED, null, null);
+        service.updateJobOutput(null, job.getId(), ExecutableState.ERROR, null, null, null);
+        service.updateJobOutput(null, job.getId(), ExecutableState.STOPPED, null, null, null);
     }
 
     private static void assertJobEqual(Executable one, Executable another) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index 0855012..71fc19f 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -145,8 +145,8 @@ public class DefaultSchedulerTest extends BaseSchedulerTest {
         job.addTask(task1);
         job.addTask(task2);
         execMgr.addJob(job);
-        ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).updateJobOutput(task2.getId(),
-                ExecutableState.RUNNING, null, null);
+        ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).updateJobOutput(null, task2.getId(),
+                ExecutableState.RUNNING, null, null, null);
         waitForJobFinish(job.getId(), MAX_WAIT_TIME);
         Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState());
         Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
new file mode 100644
index 0000000..5a90fb2
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.common.logging;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractHdfsLogAppender extends AppenderSkeleton {
+    private final Object flushLogLock = new Object();
+    private final Object initWriterLock = new Object();
+    private final Object closeLock = new Object();
+    private final Object fileSystemLock = new Object();
+
+    private FSDataOutputStream outStream = null;
+    private BufferedWriter bufferedWriter = null;
+
+    private FileSystem fileSystem = null;
+
+    private ExecutorService appendHdfsService = null;
+
+    private BlockingDeque<LoggingEvent> logBufferQue = null;
+    private static final double QUEUE_FLUSH_THRESHOLD = 0.2;
+
+    //configurable
+    private int logQueueCapacity = 8192;
+    private int flushInterval = 5000;
+    private String hdfsWorkingDir;
+
+    public int getLogQueueCapacity() {
+        return logQueueCapacity;
+    }
+
+    public void setLogQueueCapacity(int logQueueCapacity) {
+        this.logQueueCapacity = logQueueCapacity;
+    }
+
+    public BlockingDeque<LoggingEvent> getLogBufferQue() {
+        return logBufferQue;
+    }
+
+    public int getFlushInterval() {
+        return flushInterval;
+    }
+
+    public void setFlushInterval(int flushInterval) {
+        this.flushInterval = flushInterval;
+    }
+
+    public String getHdfsWorkingDir() {
+        return hdfsWorkingDir;
+    }
+
+    public void setHdfsWorkingDir(String hdfsWorkingDir) {
+        this.hdfsWorkingDir = hdfsWorkingDir;
+    }
+
+    public FileSystem getFileSystem() {
+        if (null == fileSystem) {
+            return getFileSystem(new Configuration());
+        }
+        return fileSystem;
+    }
+
+    private FileSystem getFileSystem(Configuration conf) {
+        synchronized (fileSystemLock) {
+            if (null == fileSystem) {
+                try {
+                    fileSystem = new Path(hdfsWorkingDir).getFileSystem(conf);
+                } catch (IOException e) {
+                    LogLog.error("Failed to create the file system, ", e);
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return fileSystem;
+    }
+
+    public boolean isWriterInited() {
+        synchronized (initWriterLock) {
+            return null != bufferedWriter;
+        }
+    }
+
+    abstract void init();
+
+    abstract String getAppenderName();
+
+    /**
+     * init the load resource.
+     */
+    @Override
+    public void activateOptions() {
+        LogLog.warn(String.format(Locale.ROOT, "%s starting ...", getAppenderName()));
+        LogLog.warn("hdfsWorkingDir -> " + getHdfsWorkingDir());
+
+        init();
+
+        logBufferQue = new LinkedBlockingDeque<>(getLogQueueCapacity());
+        appendHdfsService = Executors.newSingleThreadExecutor();
+        appendHdfsService.execute(this::checkAndFlushLog);
+        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+
+        LogLog.warn(String.format(Locale.ROOT, "%s started ...", getAppenderName()));
+    }
+
+    @Override
+    public void append(LoggingEvent loggingEvent) {
+        try {
+            boolean offered = logBufferQue.offer(loggingEvent, 10, TimeUnit.SECONDS);
+            if (!offered) {
+                LogLog.error("LogEvent cannot put into the logBufferQue, log event content:");
+                printLoggingEvent(loggingEvent);
+            }
+        } catch (InterruptedException e) {
+            LogLog.warn("Append logging event interrupted!", e);
+            // Restore interrupted state...
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Override
+    public void close() {
+        synchronized (closeLock) {
+            if (!this.closed) {
+                this.closed = true;
+
+                List<LoggingEvent> transaction = Lists.newArrayList();
+                try {
+                    flushLog(getLogBufferQue().size(), transaction);
+
+                    closeWriter();
+                    if (appendHdfsService != null && !appendHdfsService.isShutdown()) {
+                        appendHdfsService.shutdownNow();
+                    }
+                } catch (Exception e) {
+                    transaction.forEach(this::printLoggingEvent);
+                    try {
+                        while (!getLogBufferQue().isEmpty()) {
+                            printLoggingEvent(getLogBufferQue().take());
+                        }
+                    } catch (Exception ie) {
+                        LogLog.error("clear the logging buffer queue failed!", ie);
+                    }
+                    LogLog.error(String.format(Locale.ROOT, "close %s failed!", getAppenderName()), e);
+                }
+                LogLog.warn(String.format(Locale.ROOT, "%s closed ...", getAppenderName()));
+            }
+        }
+    }
+
+    private void closeWriter() {
+        IOUtils.closeQuietly(bufferedWriter);
+        IOUtils.closeQuietly(outStream);
+    }
+
+    @Override
+    public boolean requiresLayout() {
+        return true;
+    }
+
+    /**
+     * some times need to wait the component init ok.
+     *
+     * @return
+     */
+    abstract boolean isSkipCheckAndFlushLog();
+
+    /**
+     * clear the log buffer queue when it was full.
+     */
+    private void clearLogBufferQueueWhenBlocked() {
+        if (logBufferQue.size() >= getLogQueueCapacity()) {
+            int removeNum = getLogQueueCapacity() / 5;
+            while (removeNum > 0) {
+                try {
+                    LoggingEvent loggingEvent = logBufferQue.take();
+                    printLoggingEvent(loggingEvent);
+                } catch (Exception ex) {
+                    LogLog.error("Take event interrupted!", ex);
+                }
+                removeNum--;
+            }
+        }
+    }
+
+    /**
+     * print the logging event to stderr
+     * @param loggingEvent
+     */
+    private void printLoggingEvent(LoggingEvent loggingEvent) {
+        try {
+            String log = getLayout().format(loggingEvent);
+            LogLog.error(log.endsWith("\n") ? log.substring(0, log.length() - 1) : log);
+            if (null != loggingEvent.getThrowableStrRep()) {
+                for (String stack : loggingEvent.getThrowableStrRep()) {
+                    LogLog.error(stack);
+                }
+            }
+        } catch (Exception e) {
+            LogLog.error("print logging event failed!", e);
+        }
+    }
+
+    /**
+     * flush the log to hdfs when conditions are satisfied.
+     */
+    protected void checkAndFlushLog() {
+        long start = System.currentTimeMillis();
+        do {
+            List<LoggingEvent> transaction = Lists.newArrayList();
+            try {
+                if (isSkipCheckAndFlushLog()) {
+                    continue;
+                }
+
+                int eventSize = getLogBufferQue().size();
+                if (eventSize > getLogQueueCapacity() * QUEUE_FLUSH_THRESHOLD
+                        || System.currentTimeMillis() - start > getFlushInterval()) {
+                    // update start time before doing flushLog to avoid exception when flushLog
+                    start = System.currentTimeMillis();
+                    flushLog(eventSize, transaction);
+                } else {
+                    Thread.sleep(getFlushInterval() / 100);
+                }
+            } catch (Exception e) {
+                transaction.forEach(this::printLoggingEvent);
+                clearLogBufferQueueWhenBlocked();
+                LogLog.error("Error occurred when consume event", e);
+            }
+        } while (!closed);
+    }
+
+    /**
+     * init the hdfs writer and create the hdfs file with outPath.
+     * need kerberos authentic, so fileSystem init here.
+     *
+     * @param outPath
+     */
+    protected boolean initHdfsWriter(Path outPath, Configuration conf) {
+        synchronized (initWriterLock) {
+            closeWriter();
+            bufferedWriter = null;
+            outStream = null;
+
+            int retry = 10;
+            while (retry-- > 0) {
+                try {
+                    fileSystem = getFileSystem(conf);
+                    outStream = fileSystem.create(outPath, true);
+                    break;
+                } catch (Exception e) {
+                    LogLog.error("fail to create stream for path: " + outPath, e);
+                }
+
+                try {
+                    initWriterLock.wait(1000);//waiting for acl to turn to current user
+                } catch (InterruptedException e) {
+                    LogLog.warn("Init writer interrupted!", e);
+                    // Restore interrupted state...
+                    Thread.currentThread().interrupt();
+                }
+            }
+            if (null != outStream) {
+                bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream, StandardCharsets.UTF_8));
+                return true;
+            }
+        }
+
+        return false;
+    }
+
+    /**
+     * write the data into the buffer.
+     *
+     * @param message
+     * @throws IOException
+     */
+    protected void write(String message) throws IOException {
+        bufferedWriter.write(message);
+    }
+
+    /**
+     * write the error stack info into buffer
+     *
+     * @param loggingEvent
+     * @throws IOException
+     */
+    protected void writeLogEvent(LoggingEvent loggingEvent) throws IOException {
+        if (null != loggingEvent) {
+            write(getLayout().format(loggingEvent));
+
+            if (null != loggingEvent.getThrowableStrRep()) {
+                for (String message : loggingEvent.getThrowableStrRep()) {
+                    write(message);
+                    write("\n");
+                }
+            }
+        }
+    }
+
+    /**
+     * do write log to the buffer.
+     *
+     * @param eventSize
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    abstract void doWriteLog(int eventSize, List<LoggingEvent> transaction) throws IOException, InterruptedException;
+
+    /**
+     * flush the buffer data to HDFS.
+     *
+     * @throws IOException
+     */
+    private void flush() throws IOException {
+        bufferedWriter.flush();
+        outStream.hsync();
+    }
+
+    /**
+     * take the all events from queue and write into the HDFS immediately.
+     *
+     * @param eventSize
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    protected void flushLog(int eventSize, List<LoggingEvent> transaction) throws IOException, InterruptedException {
+        if (eventSize <= 0) {
+            return;
+        }
+
+        synchronized (flushLogLock) {
+            if (eventSize > getLogBufferQue().size()) {
+                eventSize = getLogBufferQue().size();
+            }
+
+            doWriteLog(eventSize, transaction);
+
+            flush();
+        }
+    }
+}
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java
new file mode 100644
index 0000000..0e5ad3c
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.common.logging;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SparkDriverHdfsLogAppender extends AbstractHdfsLogAppender {
+
+    private String logPath;
+
+    // kerberos
+    private boolean kerberosEnable = false;
+    private String kerberosPrincipal;
+    private String kerberosKeytab;
+
+    public String getLogPath() {
+        return logPath;
+    }
+
+    public void setLogPath(String logPath) {
+        this.logPath = logPath;
+    }
+
+    public boolean isKerberosEnable() {
+        return kerberosEnable;
+    }
+
+    public void setKerberosEnable(boolean kerberosEnable) {
+        this.kerberosEnable = kerberosEnable;
+    }
+
+    public String getKerberosPrincipal() {
+        return kerberosPrincipal;
+    }
+
+    public void setKerberosPrincipal(String kerberosPrincipal) {
+        this.kerberosPrincipal = kerberosPrincipal;
+    }
+
+    public String getKerberosKeytab() {
+        return kerberosKeytab;
+    }
+
+    public void setKerberosKeytab(String kerberosKeytab) {
+        this.kerberosKeytab = kerberosKeytab;
+    }
+
+    @Override
+    public void init() {
+        LogLog.warn("spark.driver.log4j.appender.hdfs.File -> " + getLogPath());
+        LogLog.warn("kerberosEnable -> " + isKerberosEnable());
+        if (isKerberosEnable()) {
+            LogLog.warn("kerberosPrincipal -> " + getKerberosPrincipal());
+            LogLog.warn("kerberosKeytab -> " + getKerberosKeytab());
+        }
+    }
+
+    @Override
+    String getAppenderName() {
+        return "SparkDriverHdfsLogAppender";
+    }
+
+    @Override
+    public boolean isSkipCheckAndFlushLog() {
+        return false;
+    }
+
+    @Override
+    public void doWriteLog(int eventSize, List<LoggingEvent> transaction)
+            throws IOException, InterruptedException {
+        if (!isWriterInited()) {
+            Configuration conf = new Configuration();
+            if (isKerberosEnable()) {
+                UserGroupInformation.setConfiguration(conf);
+                UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());
+            }
+            if (!initHdfsWriter(new Path(getLogPath()), conf)) {
+                LogLog.error("init the hdfs writer failed!");
+            }
+        }
+
+        while (eventSize > 0) {
+            LoggingEvent loggingEvent = getLogBufferQue().take();
+            transaction.add(loggingEvent);
+            writeLogEvent(loggingEvent);
+            eventSize--;
+        }
+    }
+}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 98f63a1..3cc1b60 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -26,15 +26,14 @@ import java.net.UnknownHostException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Collections;
-
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
-import java.util.Map.Entry;
-
 import java.util.Set;
+import java.util.Objects;
+import java.util.Map.Entry;
 
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -108,6 +107,7 @@ public class NSparkExecutable extends AbstractExecutable {
         CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
         CubeInstance cube = cubeMgr.getCube(this.getCubeName());
         KylinConfig config = cube.getConfig();
+        this.setLogPath(getSparkDriverLogHdfsPath(context.getConfig()));
         config = wrapConfig(config);
 
         String sparkHome = KylinConfig.getSparkHome();
@@ -189,15 +189,12 @@ public class NSparkExecutable extends AbstractExecutable {
 
     /**
      * generate the spark driver log hdfs path format, json path + timestamp + .log
-     *
-     * @param
-     * @return
      */
-    /*public String getSparkDriverLogHdfsPath(KylinConfig config) {
-        return String.format("%s.%s.log", config.getJobTmpOutputStorePath(getProject(), getId()),
+    public String getSparkDriverLogHdfsPath(KylinConfig config) {
+        return String.format(Locale.ROOT, "%s.%s.log", config.getJobOutputStorePath(getParam(MetadataConstants.P_PROJECT_NAME), getId()),
                 System.currentTimeMillis());
-    }*/
-    
+    }
+
     protected KylinConfig wrapConfig(ExecutableContext context) {
         return wrapConfig(context.getConfig());
     }
@@ -206,15 +203,18 @@ public class NSparkExecutable extends AbstractExecutable {
         String project = getParam(MetadataConstants.P_PROJECT_NAME);
         Preconditions.checkState(StringUtils.isNotBlank(project), "job " + getId() + " project info is empty");
 
+        HashMap<String, String> jobOverrides = new HashMap();
         String parentId = getParentId();
-        originalConfig.setProperty("job.id", StringUtils.defaultIfBlank(parentId, getId()));
-        originalConfig.setProperty("job.project", project);
+        jobOverrides.put("job.id", StringUtils.defaultIfBlank(parentId, getId()));
+        jobOverrides.put("job.project", project);
         if (StringUtils.isNotBlank(parentId)) {
-            originalConfig.setProperty("job.stepId", getId());
+            jobOverrides.put("job.stepId", getId());
         }
-        originalConfig.setProperty("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone());
+        jobOverrides.put("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone());
+        jobOverrides.put("spark.driver.log4j.appender.hdfs.File",
+                Objects.isNull(this.getLogPath()) ? "null" : this.getLogPath());
 
-        return originalConfig;
+        return KylinConfigExt.createInstance(originalConfig, jobOverrides);
     }
 
     private void killOrphanApplicationIfExists(KylinConfig config, String jobId) {
@@ -302,27 +302,64 @@ public class NSparkExecutable extends AbstractExecutable {
             sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
         }
 
+        replaceSparkDriverJavaOpsConfIfNeeded(config, sparkConfigOverride);
+        return sparkConfigOverride;
+    }
+
+    private void replaceSparkDriverJavaOpsConfIfNeeded(KylinConfig config, Map<String, String> sparkConfigOverride) {
+        String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions";
+        StringBuilder sb = new StringBuilder();
+        if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) {
+            sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey));
+        }
+
         String serverIp = "127.0.0.1";
         try {
             serverIp = InetAddress.getLocalHost().getHostAddress();
         } catch (UnknownHostException e) {
             logger.warn("use the InetAddress get local ip failed!", e);
         }
+        String serverPort = config.getServerPort();
 
-        String log4jConfiguration = "file:" + config.getLogSparkPropertiesFile();
+        String hdfsWorkingDir = config.getHdfsWorkingDirectory();
 
-        String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions";
-        StringBuilder sb = new StringBuilder();
-        if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) {
-            sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey));
+        String sparkDriverHdfsLogPath = null;
+        if (config instanceof KylinConfigExt) {
+            Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
+            if (Objects.nonNull(extendedOverrides)) {
+                sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
+            }
         }
 
+        /*
+        if (config.isCloud()) {
+            String logLocalWorkingDirectory = config.getLogLocalWorkingDirectory();
+            if (StringUtils.isNotBlank(logLocalWorkingDirectory)) {
+                hdfsWorkingDir = logLocalWorkingDirectory;
+                sparkDriverHdfsLogPath = logLocalWorkingDirectory + sparkDriverHdfsLogPath;
+            }
+        }
+         */
+
+        String log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile();
         sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration));
+        sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", config.isKerberosEnabled()));
+
+        if (config.isKerberosEnabled()) {
+            sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.principal=%s ", config.getKerberosPrincipal()));
+            sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.keytab=%s", config.getKerberosKeytabPath()));
+            if (config.getPlatformZKEnable()) {
+                sb.append(String.format(Locale.ROOT, " -Djava.security.auth.login.config=%s", config.getKerberosJaasConfPath()));
+                sb.append(String.format(Locale.ROOT, " -Djava.security.krb5.conf=%s", config.getKerberosKrb5ConfPath()));
+            }
+        }
+        sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDir));
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
         sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp));
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort));
         sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
-
+        sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
         sparkConfigOverride.put(sparkDriverExtraJavaOptionsKey, sb.toString());
-        return sparkConfigOverride;
     }
 
     protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar,
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java
index 8607348..dba8132 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java
@@ -50,6 +50,8 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.bind.annotation.ResponseStatus;
 
+import static org.apache.kylin.shaded.com.google.common.net.HttpHeaders.CONTENT_DISPOSITION;
+
 /**
  */
 public class BasicController {
@@ -132,6 +134,20 @@ public class BasicController {
         }
     }
 
+    protected void setDownloadResponse(InputStream inputStream, String fileName, String contentType,
+                                       final HttpServletResponse response) throws IOException {
+        try (OutputStream output = response.getOutputStream()) {
+            response.reset();
+            response.setContentType(contentType);
+            response.setHeader(CONTENT_DISPOSITION, "attachment; filename=\"" + fileName + "\"");
+            IOUtils.copyLarge(inputStream, output);
+            output.flush();
+        } catch (IOException e) {
+            logger.error("Failed download log File!");
+            throw e;
+        }
+    }
+
     public boolean isAdmin() {
         boolean isAdmin = false;
         Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 65c036c..c28362c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -18,11 +18,13 @@
 
 package org.apache.kylin.rest.controller;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Locale;
 
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.constant.JobStatusEnum;
@@ -30,16 +32,22 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.JobListRequest;
+import org.apache.kylin.rest.response.EnvelopeResponse;
+import org.apache.kylin.rest.response.ResponseCode;
 import org.apache.kylin.rest.service.JobService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.MediaType;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RequestParam;
+
+import javax.servlet.http.HttpServletResponse;
 
 @Controller
 @RequestMapping(value = "jobs")
@@ -160,6 +168,7 @@ public class JobController extends BasicController {
      * @return
      * @throws IOException
      */
+
     @RequestMapping(value = "/{jobId}/steps/{stepId}/output", method = { RequestMethod.GET }, produces = {
             "application/json" })
     @ResponseBody
@@ -167,11 +176,34 @@ public class JobController extends BasicController {
         Map<String, String> result = new HashMap<String, String>();
         result.put("jobId", jobId);
         result.put("stepId", String.valueOf(stepId));
-        result.put("cmd_output", jobService.getExecutableManager().getOutput(stepId).getVerboseMsg());
+        result.put("cmd_output", jobService.getJobOutput(jobId, stepId));
         return result;
     }
 
     /**
+     * Download a job step output from hdfs
+     * @param jobId
+     * @param stepId
+     * @param project
+     * @param response
+     * @return
+     */
+    @RequestMapping(value = "/{job_id:.+}/steps/{step_id:.+}/log", method = { RequestMethod.GET }, produces = { "application/json" })
+    @ResponseBody
+    public EnvelopeResponse<String> downloadLogFile(@PathVariable("job_id") String jobId,
+                                                    @PathVariable("step_id") String stepId, @RequestParam(value = "project") String project,
+                                                    HttpServletResponse response) throws IOException {
+        checkRequiredArg("job_id", jobId);
+        checkRequiredArg("step_id", stepId);
+        checkRequiredArg("project", project);
+        String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", project, stepId);
+
+        String jobOutput = jobService.getAllJobOutput(jobId, stepId);
+        setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes("UTF-8")), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response);
+        return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", "");
+    }
+
+    /**
      * Resume a cube job
      * 
      * @return
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index f7be7d1..2c0c20c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -64,6 +64,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.CheckpointExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
 import org.apache.kylin.metadata.model.ISourceAware;
@@ -479,6 +480,16 @@ public class JobService extends BasicService implements InitializingBean {
         return getExecutableManager().getOutput(id);
     }
 
+    public String getJobOutput(String jobId, String stepId) {
+        ExecutableManager executableManager = getExecutableManager();
+        return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg();
+    }
+
+    public String getAllJobOutput(String jobId, String stepId) {
+        ExecutableManager executableManager = getExecutableManager();
+        return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg();
+    }
+
     protected JobInstance getSingleJobInstance(AbstractExecutable job) {
         Message msg = MsgPicker.getMsg();