You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/11/27 07:22:48 UTC
[kylin] 01/06: KYLIN-4813 Refactor spark build log
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 2dca9ef9a52cf1adb12684e0ba576b91e96b3792
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Thu Nov 12 17:50:47 2020 +0800
KYLIN-4813 Refactor spark build log
---
.../engine/mr/common/MapReduceExecutable.java | 7 +-
build/bin/kylin-port-replace-util.sh | 1 +
build/conf/kylin-parquet-log4j.properties | 33 --
build/conf/kylin-spark-log4j.properties | 43 ---
build/conf/spark-driver-log4j.properties | 45 +++
.../org/apache/kylin/common/KylinConfigBase.java | 26 +-
.../common/logging/SensitivePatternLayout.java | 62 ++++
.../src/main/resources/kylin-defaults.properties | 3 +
.../apache/kylin/job/dao/ExecutableOutputPO.java | 11 +
.../kylin/job/execution/AbstractExecutable.java | 19 +-
.../job/execution/DefaultChainedExecutable.java | 18 +-
.../kylin/job/execution/ExecutableManager.java | 263 +++++++++++++-
.../apache/kylin/job/ExecutableManagerTest.java | 20 +-
.../job/impl/threadpool/DefaultSchedulerTest.java | 4 +-
.../common/logging/AbstractHdfsLogAppender.java | 377 +++++++++++++++++++++
.../common/logging/SparkDriverHdfsLogAppender.java | 112 ++++++
.../kylin/engine/spark/job/NSparkExecutable.java | 81 +++--
.../kylin/rest/controller/BasicController.java | 16 +
.../kylin/rest/controller/JobController.java | 34 +-
.../org/apache/kylin/rest/service/JobService.java | 11 +
20 files changed, 1044 insertions(+), 142 deletions(-)
diff --git a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
index a33f171..4978fa0 100755
--- a/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
+++ b/build-engine/src/main/java/org/apache/kylin/engine/mr/common/MapReduceExecutable.java
@@ -57,6 +57,7 @@ import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.metadata.MetadataConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,7 +86,7 @@ public class MapReduceExecutable extends AbstractExecutable {
if (output.getExtra().containsKey(START_TIME)) {
final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
if (mrJobId == null) {
- getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath());
return;
}
try {
@@ -96,7 +97,7 @@ public class MapReduceExecutable extends AbstractExecutable {
//remove previous mr job info
super.onExecuteStart(executableContext);
} else {
- getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath());
}
} catch (IOException | ParseException e) {
logger.warn("error get hadoop status");
@@ -180,7 +181,7 @@ public class MapReduceExecutable extends AbstractExecutable {
JobStepStatusEnum newStatus = HadoopJobStatusChecker.checkStatus(job, output);
if (status == JobStepStatusEnum.KILLED) {
- mgr.updateJobOutput(getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin");
+ mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, hadoopCmdOutput.getInfo(), "killed by admin", null);
if (isDiscarded()) {
if (getIsNeedLock()) {
releaseLock(lock);
diff --git a/build/bin/kylin-port-replace-util.sh b/build/bin/kylin-port-replace-util.sh
index ea96791..12a2abd 100755
--- a/build/bin/kylin-port-replace-util.sh
+++ b/build/bin/kylin-port-replace-util.sh
@@ -96,6 +96,7 @@ then
sed -i "s/^kylin\.stream\.node=.*$/$stream_node/g" ${KYLIN_CONFIG_FILE}
sed -i "s/#*kylin.server.cluster-servers=\(.*\).*:\(.*\)/kylin.server.cluster-servers=\1:${new_kylin_port}/g" ${KYLIN_CONFIG_FILE}
+ sed -i "s/#*server.port=.*$/server.port=${new_kylin_port}/g" ${KYLIN_CONFIG_FILE}
echo "New kylin port is : ${new_kylin_port}"
diff --git a/build/conf/kylin-parquet-log4j.properties b/build/conf/kylin-parquet-log4j.properties
deleted file mode 100644
index 36b7dd4..0000000
--- a/build/conf/kylin-parquet-log4j.properties
+++ /dev/null
@@ -1,33 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-
-#overall config
-log4j.rootLogger=WARN,stdout
-log4j.logger.org.apache.kylin=DEBUG
-log4j.logger.org.springframework=WARN
-log4j.logger.org.springframework.security=WARN
-log4j.logger.org.apache.spark=WARN
-# For the purpose of getting Tracking URL
-log4j.logger.org.apache.spark.deploy.yarn=INFO
-log4j.logger.org.apache.spark.ContextCleaner=WARN
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.Target=System.err
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
\ No newline at end of file
diff --git a/build/conf/kylin-spark-log4j.properties b/build/conf/kylin-spark-log4j.properties
deleted file mode 100644
index 948fb32..0000000
--- a/build/conf/kylin-spark-log4j.properties
+++ /dev/null
@@ -1,43 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-log4j.rootCategory=WARN,stderr,stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.target=System.out
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
-
-log4j.appender.stderr=org.apache.log4j.ConsoleAppender
-log4j.appender.stderr.Target=System.err
-log4j.appender.stderr.layout=org.apache.log4j.PatternLayout
-log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
-
-
-# Settings to quiet third party logs that are too verbose
-log4j.logger.org.spark-project.jetty=WARN
-log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
-log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
-log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
-log4j.logger.org.apache.parquet=ERROR
-log4j.logger.parquet=ERROR
-
-# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
-log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
-log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
-log4j.logger.org.apache.spark.sql=WARN
-
-log4j.logger.org.apache.kylin=DEBUG
\ No newline at end of file
diff --git a/build/conf/spark-driver-log4j.properties b/build/conf/spark-driver-log4j.properties
new file mode 100644
index 0000000..8b0e82d
--- /dev/null
+++ b/build/conf/spark-driver-log4j.properties
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#overall config
+log4j.rootLogger=INFO,hdfs
+log4j.logger.org.apache.kylin=DEBUG
+log4j.logger.org.springframework=WARN
+log4j.logger.org.springframework.security=WARN
+log4j.logger.org.apache.spark=WARN
+log4j.logger.org.apache.spark.ContextCleaner=WARN
+
+# hdfs file appender
+log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkDriverHdfsLogAppender
+log4j.appender.hdfs.kerberosEnable=${kylin.kerberos.enabled}
+log4j.appender.hdfs.kerberosPrincipal=${kylin.kerberos.principal}
+log4j.appender.hdfs.kerberosKeytab=${kylin.kerberos.keytab}
+log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir}
+log4j.appender.hdfs.logPath=${spark.driver.log4j.appender.hdfs.File}
+log4j.appender.hdfs.logQueueCapacity=5000
+#flushPeriod count as millis
+log4j.appender.hdfs.flushInterval=5000
+log4j.appender.hdfs.layout=org.apache.kylin.common.logging.SensitivePatternLayout
+#Don't add line number (%L) as it's too costly!
+log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
+
+log4j.appender.logFile=org.apache.log4j.FileAppender
+log4j.appender.logFile.Threshold=DEBUG
+log4j.appender.logFile.File=${spark.driver.local.logDir}/${spark.driver.param.taskId}.log
+log4j.appender.logFile.layout=org.apache.kylin.common.logging.SensitivePatternLayout
+log4j.appender.logFile.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 8e53a9b..081d23f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -258,6 +258,8 @@ public abstract class KylinConfigBase implements Serializable {
final protected void reloadKylinConfig(Properties properties) {
this.properties = BCC.check(properties);
+ setProperty("kylin.metadata.url.identifier", getMetadataUrlPrefix());
+ setProperty("kylin.log.spark-driver-properties-file", getLogSparkDriverPropertiesFile());
}
private Map<Integer, String> convertKeyToInteger(Map<String, String> map) {
@@ -465,6 +467,10 @@ public abstract class KylinConfigBase implements Serializable {
return getMetadataUrl().getIdentifier();
}
+ public String getServerPort() {
+ return getOptional("server.port", "7070");
+ }
+
public Map<String, String> getResourceStoreImpls() {
Map<String, String> r = Maps.newLinkedHashMap();
// ref constants in ISourceAware
@@ -866,6 +872,14 @@ public abstract class KylinConfigBase implements Serializable {
return getOptional("kylin.job.log-dir", "/tmp/kylin/logs");
}
+ public String getKylinLogDir() {
+ String kylinHome = getKylinHome();
+ if (kylinHome == null) {
+ kylinHome = System.getProperty("KYLIN_HOME");
+ }
+ return kylinHome + File.separator + "logs";
+ }
+
public boolean getRunAsRemoteCommand() {
return Boolean.parseBoolean(getOptional("kylin.job.use-remote-cli"));
}
@@ -2664,6 +2678,10 @@ public abstract class KylinConfigBase implements Serializable {
return getHdfsWorkingDirectory() + project + "/job_tmp/";
}
+ public String getSparkLogDir(String project) {
+ return getHdfsWorkingDirectory() + project + "/spark_logs/driver/";
+ }
+
@ConfigTag(ConfigTag.Tag.NOT_CLEAR)
public int getPersistFlatTableThreshold() {
return Integer.parseInt(getOptional("kylin.engine.persist-flattable-threshold", "1"));
@@ -2807,8 +2825,8 @@ public abstract class KylinConfigBase implements Serializable {
return Boolean.parseBoolean(getOptional("kylin.query.spark-engine.enabled", "true"));
}
- public String getLogSparkPropertiesFile() {
- return getLogPropertyFile("kylin-parquet-log4j.properties");
+ public String getLogSparkDriverPropertiesFile() {
+ return getLogPropertyFile("spark-driver-log4j.properties");
}
private String getLogPropertyFile(String filename) {
@@ -2875,6 +2893,10 @@ public abstract class KylinConfigBase implements Serializable {
.parseBoolean(this.getOptional("kylin.query.pushdown.auto-set-shuffle-partitions-enabled", "true"));
}
+ public String getJobOutputStorePath(String project, String jobId) {
+ return getSparkLogDir(project) + getNestedPath(jobId) + "execute_output.json";
+ }
+
public int getBaseShufflePartitionSize() {
return Integer.parseInt(this.getOptional("kylin.query.pushdown.base-shuffle-partition-size", "48"));
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java b/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java
new file mode 100644
index 0000000..013d42d
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/logging/SensitivePatternLayout.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.common.logging;
+
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.util.Locale;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class SensitivePatternLayout extends PatternLayout {
+ private static final String PREFIX_GROUP_NAME = "prefix";
+ private static final String SENSITIVE_GROUP_NAME = "sensitive";
+ private static final String MASK = "******";
+ private static final Pattern SENSITIVE_PATTERN = Pattern.compile(
+ String.format(Locale.ROOT, "(?<%s>password\\s*[:=])(?<%s>[^,.!]*)", PREFIX_GROUP_NAME, SENSITIVE_GROUP_NAME),
+ Pattern.CASE_INSENSITIVE);
+
+ @Override
+ public String format(LoggingEvent event) {
+ if (event.getMessage() instanceof String) {
+ String maskedMessage = mask(event.getRenderedMessage());
+
+ Throwable throwable = event.getThrowableInformation() != null
+ ? event.getThrowableInformation().getThrowable()
+ : null;
+ LoggingEvent maskedEvent = new LoggingEvent(event.fqnOfCategoryClass,
+ Logger.getLogger(event.getLoggerName()), event.timeStamp, event.getLevel(), maskedMessage,
+ throwable);
+
+ return super.format(maskedEvent);
+ }
+ return super.format(event);
+ }
+
+ private String mask(String message) {
+ Matcher matcher = SENSITIVE_PATTERN.matcher(message);
+ if (matcher.find()) {
+ return matcher.replaceAll(String.format(Locale.ROOT, "${%s}%s", PREFIX_GROUP_NAME, MASK));
+ }
+ return message;
+ }
+}
+
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index eb8398b..3d8268f 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -44,6 +44,9 @@ kylin.env.zookeeper-connect-string=sandbox.hortonworks.com
# Kylin server mode, valid value [all, query, job]
kylin.server.mode=all
+## Kylin server port
+server.port=7070
+
# List of web servers in use, this enables one web server instance to sync up with other servers.
kylin.server.cluster-servers=localhost:7070
diff --git a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
index fdc6cc4..27a2831 100644
--- a/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ b/core-job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
@@ -38,6 +38,9 @@ public class ExecutableOutputPO extends RootPersistentEntity {
@JsonProperty("status")
private String status = "READY";
+ @JsonProperty("log_path")
+ private String logPath;
+
@JsonProperty("info")
private Map<String, String> info = Maps.newHashMap();
@@ -64,4 +67,12 @@ public class ExecutableOutputPO extends RootPersistentEntity {
public void setInfo(Map<String, String> info) {
this.info = info;
}
+
+ public void setLogPath(String logPath) {
+ this.logPath = logPath;
+ }
+
+ public String getLogPath() {
+ return logPath;
+ }
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 202d22e..d815e5b 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -82,6 +82,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
private Map<String, String> params = Maps.newHashMap();
protected Integer priority;
private CubeBuildTypeEnum jobType;
+ private String logPath;
protected String project;
private String targetSubject;
private List<String> targetSegments = Lists.newArrayList();//uuid of related segments
@@ -95,6 +96,14 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
this.config = config;
}
+ public void setLogPath(String logPath) {
+ this.logPath = logPath;
+ }
+
+ public String getLogPath() {
+ return logPath;
+ }
+
protected KylinConfig getConfig() {
return config;
}
@@ -107,7 +116,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
checkJobPaused();
Map<String, String> info = Maps.newHashMap();
info.put(START_TIME, Long.toString(System.currentTimeMillis()));
- getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
+ getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, info, null, getLogPath());
}
public KylinConfig getCubeSpecificConfig() {
@@ -151,9 +160,9 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
setEndTime(System.currentTimeMillis());
if (!isDiscarded() && !isRunnable()) {
if (result.succeed()) {
- getManager().updateJobOutput(getId(), ExecutableState.SUCCEED, null, result.output());
+ getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.SUCCEED, null, result.output(), getLogPath());
} else {
- getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
+ getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, result.output(), getLogPath());
}
}
}
@@ -168,7 +177,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
exception.printStackTrace(new PrintWriter(out));
output = out.toString();
}
- getManager().updateJobOutput(getId(), ExecutableState.ERROR, null, output);
+ getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, output, getLogPath());
}
}
@@ -587,7 +596,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
public final String getProject() {
if (project == null) {
- throw new IllegalStateException("project is not set for abstract executable " + getId());
+ logger.error("project is not set for abstract executable " + getId());
}
return project;
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index bbeaf6a..c0f6d87 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -101,11 +101,11 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
protected void onExecuteStart(ExecutableContext executableContext) {
final long startTime = getStartTime();
if (startTime > 0) {
- getManager().updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
+ getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, null, null, getLogPath());
} else {
Map<String, String> info = Maps.newHashMap();
info.put(START_TIME, Long.toString(System.currentTimeMillis()));
- getManager().updateJobOutput(getId(), ExecutableState.RUNNING, info, null);
+ getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.RUNNING, info, null, getLogPath());
}
getManager().addJobInfo(getId(), BUILD_INSTANCE, DistributedLockFactory.processAndHost());
}
@@ -137,8 +137,8 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
"There shouldn't be a running subtask[jobId: {}, jobName: {}], \n"
+ "it might cause endless state, will retry to fetch subtask's state.",
task.getId(), task.getName());
- getManager().updateJobOutput(task.getId(), ExecutableState.ERROR, null,
- "killed due to inconsistent state");
+ getManager().updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.ERROR, null,
+ "killed due to inconsistent state", getLogPath());
hasError = true;
}
@@ -156,21 +156,21 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
}
if (allSucceed) {
setEndTime(System.currentTimeMillis());
- mgr.updateJobOutput(getId(), ExecutableState.SUCCEED, null, null);
+ mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.SUCCEED, null, null, getLogPath());
onStatusChange(executableContext, result, ExecutableState.SUCCEED);
} else if (hasError) {
setEndTime(System.currentTimeMillis());
- mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, null);
+ mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, null, getLogPath());
onStatusChange(executableContext, result, ExecutableState.ERROR);
} else if (hasDiscarded) {
setEndTime(System.currentTimeMillis());
- mgr.updateJobOutput(getId(), ExecutableState.DISCARDED, null, null);
+ mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.DISCARDED, null, null, getLogPath());
} else {
- mgr.updateJobOutput(getId(), ExecutableState.READY, null, null);
+ mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.READY, null, null, getLogPath());
}
} else {
setEndTime(System.currentTimeMillis());
- mgr.updateJobOutput(getId(), ExecutableState.ERROR, null, result.output());
+ mgr.updateJobOutput(getParam(MetadataConstants.P_PROJECT_NAME), getId(), ExecutableState.ERROR, null, result.output(), getLogPath());
onStatusChange(executableContext, result, ExecutableState.ERROR);
}
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index ecc2c2e..c9ae8a4 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -23,7 +23,12 @@ import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_ID;
import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;
import static org.apache.kylin.job.constant.ExecutableConstants.FLINK_JOB_ID;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.IllegalFormatException;
@@ -31,18 +36,28 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
+import java.util.Deque;
+import java.util.ArrayDeque;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.JobProcessContext;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.dao.ExecutableDao;
import org.apache.kylin.job.dao.ExecutableOutputPO;
import org.apache.kylin.job.dao.ExecutablePO;
import org.apache.kylin.job.exception.IllegalStateTranferException;
import org.apache.kylin.job.exception.PersistentException;
+import org.apache.kylin.metadata.MetadataConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -240,6 +255,197 @@ public class ExecutableManager {
return result;
}
+ public Output getOutputFromHDFSByJobId(String jobId) {
+ return getOutputFromHDFSByJobId(jobId, jobId);
+ }
+
+ public Output getOutputFromHDFSByJobId(String jobId, String stepId) {
+ return getOutputFromHDFSByJobId(jobId, stepId, 100);
+ }
+
+ /**
+ * get job output from hdfs json file;
+ * if json file contains logPath,
+ * the logPath is spark driver log hdfs path(*.json.log), read sample data from log file.
+ *
+ * @param jobId
+ * @return
+ */
+ public Output getOutputFromHDFSByJobId(String jobId, String stepId, int nLines) {
+ AbstractExecutable jobInstance = getJob(jobId);
+ String outputStorePath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(jobInstance.getParam(MetadataConstants.P_PROJECT_NAME), stepId);
+ ExecutableOutputPO jobOutput = getJobOutputFromHDFS(outputStorePath);
+ assertOutputNotNull(jobOutput, outputStorePath);
+
+ if (Objects.nonNull(jobOutput.getLogPath())) {
+ if (isHdfsPathExists(jobOutput.getLogPath())) {
+ jobOutput.setContent(getSampleDataFromHDFS(jobOutput.getLogPath(), nLines));
+ } else if (StringUtils.isEmpty(jobOutput.getContent()) && Objects.nonNull(getJob(jobId))
+ && getJob(jobId).getStatus() == ExecutableState.RUNNING) {
+ jobOutput.setContent("Wait a moment ... ");
+ }
+ }
+
+ return parseOutput(jobOutput);
+ }
+
+ public ExecutableOutputPO getJobOutputFromHDFS(String resPath) {
+ DataInputStream din = null;
+ try {
+ Path path = new Path(resPath);
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ if (!fs.exists(path)) {
+ ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
+ executableOutputPO.setContent("job output not found, please check kylin.log");
+ return executableOutputPO;
+ }
+
+ din = fs.open(path);
+ return JsonUtil.readValue(din, ExecutableOutputPO.class);
+ } catch (Exception e) {
+ // If the output file on hdfs is corrupt, give an empty output
+ logger.error("get job output [{}] from HDFS failed.", resPath, e);
+ ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
+ executableOutputPO.setContent("job output broken, please check kylin.log");
+ return executableOutputPO;
+ } finally {
+ IOUtils.closeQuietly(din);
+ }
+ }
+
+ private void assertOutputNotNull(ExecutableOutputPO output, String idOrPath) {
+ com.google.common.base.Preconditions.checkArgument(output != null, "there is no related output for job :" + idOrPath);
+ }
+
+ /**
+ * check the hdfs path exists.
+ *
+ * @param hdfsPath
+ * @return
+ */
+ public boolean isHdfsPathExists(String hdfsPath) {
+ if (StringUtils.isBlank(hdfsPath)) {
+ return false;
+ }
+
+ Path path = new Path(hdfsPath);
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ try {
+ return fs.exists(path);
+ } catch (IOException e) {
+ logger.error("check the hdfs path [{}] exists failed, ", hdfsPath, e);
+ }
+
+ return false;
+ }
+
+ /**
+ * get sample data from hdfs log file.
+ * specified the lines, will get the first num lines and last num lines.
+ *
+ * @param resPath
+ * @return
+ */
+ public String getSampleDataFromHDFS(String resPath, final int nLines) {
+ try {
+ Path path = new Path(resPath);
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ if (!fs.exists(path)) {
+ return null;
+ }
+
+ FileStatus fileStatus = fs.getFileStatus(path);
+ try (FSDataInputStream din = fs.open(path);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(din, "UTF-8"))) {
+
+ String line;
+ StringBuilder sampleData = new StringBuilder();
+ for (int i = 0; i < nLines && (line = reader.readLine()) != null; i++) {
+ if (sampleData.length() > 0) {
+ sampleData.append('\n');
+ }
+ sampleData.append(line);
+ }
+
+ int offset = sampleData.toString().getBytes("UTF-8").length + 1;
+ if (offset < fileStatus.getLen()) {
+ sampleData.append("\n================================================================\n");
+ sampleData.append(tailHdfsFileInputStream(din, offset, fileStatus.getLen(), nLines));
+ }
+ return sampleData.toString();
+ }
+ } catch (IOException e) {
+ logger.error("get sample data from hdfs log file [{}] failed!", resPath, e);
+ return null;
+ }
+ }
+
+ /**
+ * get the last N_LINES lines from the end of hdfs file input stream;
+ * reference: https://olapio.atlassian.net/wiki/spaces/PD/pages/1306918958
+ *
+ * @param hdfsDin
+ * @param startPos
+ * @param endPos
+ * @param nLines
+ * @return
+ * @throws IOException
+ */
+ private String tailHdfsFileInputStream(FSDataInputStream hdfsDin, final long startPos, final long endPos,
+ final int nLines) throws IOException {
+ com.google.common.base.Preconditions.checkNotNull(hdfsDin);
+ com.google.common.base.Preconditions.checkArgument(startPos < endPos && startPos >= 0);
+ com.google.common.base.Preconditions.checkArgument(nLines >= 0);
+
+ Deque<String> deque = new ArrayDeque<>();
+ int buffSize = 8192;
+ byte[] byteBuf = new byte[buffSize];
+
+ long pos = endPos;
+
+ // cause by log last char is \n
+ hdfsDin.seek(pos - 1);
+ int lastChar = hdfsDin.read();
+ if ('\n' == lastChar) {
+ pos--;
+ }
+
+ int bytesRead = (int) ((pos - startPos) % buffSize);
+ if (bytesRead == 0) {
+ bytesRead = buffSize;
+ }
+
+ pos -= bytesRead;
+ int lines = nLines;
+ while (lines > 0 && pos >= startPos) {
+ bytesRead = hdfsDin.read(pos, byteBuf, 0, bytesRead);
+
+ int last = bytesRead;
+ for (int i = bytesRead - 1; i >= 0 && lines > 0; i--) {
+ if (byteBuf[i] == '\n') {
+ deque.push(new String(byteBuf, i, last - i, StandardCharsets.UTF_8));
+ lines--;
+ last = i;
+ }
+ }
+
+ if (lines > 0 && last > 0) {
+ deque.push(new String(byteBuf, 0, last, StandardCharsets.UTF_8));
+ }
+
+ bytesRead = buffSize;
+ pos -= bytesRead;
+ }
+
+ StringBuilder sb = new StringBuilder();
+ while (!deque.isEmpty()) {
+ sb.append(deque.pop());
+ }
+
+ return sb.length() > 0 && sb.charAt(0) == '\n' ? sb.substring(1) : sb.toString();
+ }
+
+
public List<AbstractExecutable> getAllExecutables() {
try {
List<AbstractExecutable> ret = Lists.newArrayList();
@@ -342,12 +548,12 @@ public class ExecutableManager {
List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
for (AbstractExecutable task : tasks) {
if (task.getStatus() == ExecutableState.RUNNING) {
- updateJobOutput(task.getId(), ExecutableState.READY, null, null);
+ updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, null, null, task.getLogPath());
break;
}
}
}
- updateJobOutput(jobId, ExecutableState.READY, null, null);
+ updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.READY, null, null, job.getLogPath());
}
public void resumeJob(String jobId) {
@@ -360,7 +566,7 @@ public class ExecutableManager {
List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
for (AbstractExecutable task : tasks) {
if (task.getStatus() == ExecutableState.ERROR || task.getStatus() == ExecutableState.STOPPED) {
- updateJobOutput(task.getId(), ExecutableState.READY, null, "no output");
+ updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, null, "no output", task.getLogPath());
break;
}
}
@@ -372,7 +578,7 @@ public class ExecutableManager {
info.remove(AbstractExecutable.END_TIME);
}
}
- updateJobOutput(jobId, ExecutableState.READY, info, null);
+ updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.READY, info, null, job.getLogPath());
}
public void discardJob(String jobId) {
@@ -394,11 +600,11 @@ public class ExecutableManager {
List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
for (AbstractExecutable task : tasks) {
if (!task.getStatus().isFinalState()) {
- updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null);
+ updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.DISCARDED, null, null, task.getLogPath());
}
}
}
- updateJobOutput(jobId, ExecutableState.DISCARDED, null, null);
+ updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.DISCARDED, null, null, job.getLogPath());
}
public void rollbackJob(String jobId, String stepId) {
@@ -412,13 +618,13 @@ public class ExecutableManager {
for (AbstractExecutable task : tasks) {
if (task.getId().compareTo(stepId) >= 0) {
logger.debug("rollback task : " + task);
- updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "");
+ updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", task.getLogPath());
}
}
}
if (job.getStatus() == ExecutableState.SUCCEED) {
- updateJobOutput(job.getId(), ExecutableState.READY, null, null);
+ updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), job.getId(), ExecutableState.READY, null, null, job.getLogPath());
}
}
@@ -439,12 +645,12 @@ public class ExecutableManager {
List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
for (AbstractExecutable task : tasks) {
if (!task.getStatus().isFinalState()) {
- updateJobOutput(task.getId(), ExecutableState.STOPPED, null, null);
+ updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.STOPPED, null, null, task.getLogPath());
break;
}
}
}
- updateJobOutput(jobId, ExecutableState.STOPPED, null, null);
+ updateJobOutput(job.getParam(MetadataConstants.P_PROJECT_NAME), jobId, ExecutableState.STOPPED, null, null, job.getLogPath());
}
public ExecutableOutputPO getJobOutput(String jobId) {
@@ -456,7 +662,7 @@ public class ExecutableManager {
}
}
- public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) {
+ public void updateJobOutput(String project, String jobId, ExecutableState newStatus, Map<String, String> info, String output, String logPath) {
// when
if (Thread.currentThread().isInterrupted()) {
throw new RuntimeException("Current thread is interruptted, aborting");
@@ -494,6 +700,39 @@ public class ExecutableManager {
logger.error("error change job:" + jobId + " to " + newStatus);
throw new RuntimeException(e);
}
+
+ if (project != null) {
+ //write output to HDFS
+ updateJobOutputToHDFS(project, jobId, output, logPath);
+ }
+ }
+
+ public void updateJobOutputToHDFS(String project, String jobId, String output, String logPath) {
+ ExecutableOutputPO jobOutput = getJobOutput(jobId);
+ if (null != output) {
+ jobOutput.setContent(output);
+ }
+ if (null != logPath) {
+ jobOutput.setLogPath(logPath);
+ }
+ String outputHDFSPath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(project, jobId);
+
+ updateJobOutputToHDFS(outputHDFSPath, jobOutput);
+ }
+
+ public void updateJobOutputToHDFS(String resPath, ExecutableOutputPO obj) {
+ DataOutputStream dout = null;
+ try {
+ Path path = new Path(resPath);
+ FileSystem fs = HadoopUtil.getWorkingFileSystem();
+ dout = fs.create(path, true);
+ JsonUtil.writeValue(dout, obj);
+ } catch (Exception e) {
+ // the operation to update output to hdfs failed, next task should not be interrupted.
+ logger.error("update job output [{}] to HDFS failed.", resPath, e);
+ } finally {
+ IOUtils.closeQuietly(dout);
+ }
}
private boolean needDestroyProcess(ExecutableState from, ExecutableState to) {
@@ -548,7 +787,7 @@ public class ExecutableManager {
if (executableDao.getJobOutput(task.getId()).getStatus().equals("SUCCEED")) {
continue;
} else if (executableDao.getJobOutput(task.getId()).getStatus().equals("RUNNING")) {
- updateJobOutput(task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "");
+ updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", null);
}
break;
}
diff --git a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
index 3535d37..3341dff 100644
--- a/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/ExecutableManagerTest.java
@@ -75,7 +75,7 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase {
AbstractExecutable another = service.getJob(executable.getId());
assertJobEqual(executable, another);
- service.updateJobOutput(executable.getId(), ExecutableState.RUNNING, null, "test output");
+ service.updateJobOutput(null, executable.getId(), ExecutableState.RUNNING, null, "test output", null);
assertJobEqual(executable, service.getJob(executable.getId()));
}
@@ -98,21 +98,21 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase {
SucceedTestExecutable job = new SucceedTestExecutable();
String id = job.getId();
service.addJob(job);
- service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
- service.updateJobOutput(id, ExecutableState.ERROR, null, null);
- service.updateJobOutput(id, ExecutableState.READY, null, null);
- service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
- service.updateJobOutput(id, ExecutableState.READY, null, null);
- service.updateJobOutput(id, ExecutableState.RUNNING, null, null);
- service.updateJobOutput(id, ExecutableState.SUCCEED, null, null);
+ service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null);
+ service.updateJobOutput(null, id, ExecutableState.ERROR, null, null, null);
+ service.updateJobOutput(null, id, ExecutableState.READY, null, null, null);
+ service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null);
+ service.updateJobOutput(null, id, ExecutableState.READY, null, null, null);
+ service.updateJobOutput(null, id, ExecutableState.RUNNING, null, null, null);
+ service.updateJobOutput(null, id, ExecutableState.SUCCEED, null, null, null);
}
@Test(expected = IllegalStateTranferException.class)
public void testInvalidStateTransfer() {
SucceedTestExecutable job = new SucceedTestExecutable();
service.addJob(job);
- service.updateJobOutput(job.getId(), ExecutableState.ERROR, null, null);
- service.updateJobOutput(job.getId(), ExecutableState.STOPPED, null, null);
+ service.updateJobOutput(null, job.getId(), ExecutableState.ERROR, null, null, null);
+ service.updateJobOutput(null, job.getId(), ExecutableState.STOPPED, null, null, null);
}
private static void assertJobEqual(Executable one, Executable another) {
diff --git a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
index 0855012..71fc19f 100644
--- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java
@@ -145,8 +145,8 @@ public class DefaultSchedulerTest extends BaseSchedulerTest {
job.addTask(task1);
job.addTask(task2);
execMgr.addJob(job);
- ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).updateJobOutput(task2.getId(),
- ExecutableState.RUNNING, null, null);
+ ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).updateJobOutput(null, task2.getId(),
+ ExecutableState.RUNNING, null, null, null);
waitForJobFinish(job.getId(), MAX_WAIT_TIME);
Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState());
Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
new file mode 100644
index 0000000..5a90fb2
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.common.logging;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Locale;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractHdfsLogAppender extends AppenderSkeleton {
+ private final Object flushLogLock = new Object();
+ private final Object initWriterLock = new Object();
+ private final Object closeLock = new Object();
+ private final Object fileSystemLock = new Object();
+
+ private FSDataOutputStream outStream = null;
+ private BufferedWriter bufferedWriter = null;
+
+ private FileSystem fileSystem = null;
+
+ private ExecutorService appendHdfsService = null;
+
+ private BlockingDeque<LoggingEvent> logBufferQue = null;
+ private static final double QUEUE_FLUSH_THRESHOLD = 0.2;
+
+ //configurable
+ private int logQueueCapacity = 8192;
+ private int flushInterval = 5000;
+ private String hdfsWorkingDir;
+
+ public int getLogQueueCapacity() {
+ return logQueueCapacity;
+ }
+
+ public void setLogQueueCapacity(int logQueueCapacity) {
+ this.logQueueCapacity = logQueueCapacity;
+ }
+
+ public BlockingDeque<LoggingEvent> getLogBufferQue() {
+ return logBufferQue;
+ }
+
+ public int getFlushInterval() {
+ return flushInterval;
+ }
+
+ public void setFlushInterval(int flushInterval) {
+ this.flushInterval = flushInterval;
+ }
+
+ public String getHdfsWorkingDir() {
+ return hdfsWorkingDir;
+ }
+
+ public void setHdfsWorkingDir(String hdfsWorkingDir) {
+ this.hdfsWorkingDir = hdfsWorkingDir;
+ }
+
+ public FileSystem getFileSystem() {
+ if (null == fileSystem) {
+ return getFileSystem(new Configuration());
+ }
+ return fileSystem;
+ }
+
+ private FileSystem getFileSystem(Configuration conf) {
+ synchronized (fileSystemLock) {
+ if (null == fileSystem) {
+ try {
+ fileSystem = new Path(hdfsWorkingDir).getFileSystem(conf);
+ } catch (IOException e) {
+ LogLog.error("Failed to create the file system, ", e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return fileSystem;
+ }
+
+ public boolean isWriterInited() {
+ synchronized (initWriterLock) {
+ return null != bufferedWriter;
+ }
+ }
+
+ abstract void init();
+
+ abstract String getAppenderName();
+
+ /**
+ * init the load resource.
+ */
+ @Override
+ public void activateOptions() {
+ LogLog.warn(String.format(Locale.ROOT, "%s starting ...", getAppenderName()));
+ LogLog.warn("hdfsWorkingDir -> " + getHdfsWorkingDir());
+
+ init();
+
+ logBufferQue = new LinkedBlockingDeque<>(getLogQueueCapacity());
+ appendHdfsService = Executors.newSingleThreadExecutor();
+ appendHdfsService.execute(this::checkAndFlushLog);
+ Runtime.getRuntime().addShutdownHook(new Thread(this::close));
+
+ LogLog.warn(String.format(Locale.ROOT, "%s started ...", getAppenderName()));
+ }
+
+ @Override
+ public void append(LoggingEvent loggingEvent) {
+ try {
+ boolean offered = logBufferQue.offer(loggingEvent, 10, TimeUnit.SECONDS);
+ if (!offered) {
+ LogLog.error("LogEvent cannot put into the logBufferQue, log event content:");
+ printLoggingEvent(loggingEvent);
+ }
+ } catch (InterruptedException e) {
+ LogLog.warn("Append logging event interrupted!", e);
+ // Restore interrupted state...
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Override
+ public void close() {
+ synchronized (closeLock) {
+ if (!this.closed) {
+ this.closed = true;
+
+ List<LoggingEvent> transaction = Lists.newArrayList();
+ try {
+ flushLog(getLogBufferQue().size(), transaction);
+
+ closeWriter();
+ if (appendHdfsService != null && !appendHdfsService.isShutdown()) {
+ appendHdfsService.shutdownNow();
+ }
+ } catch (Exception e) {
+ transaction.forEach(this::printLoggingEvent);
+ try {
+ while (!getLogBufferQue().isEmpty()) {
+ printLoggingEvent(getLogBufferQue().take());
+ }
+ } catch (Exception ie) {
+ LogLog.error("clear the logging buffer queue failed!", ie);
+ }
+ LogLog.error(String.format(Locale.ROOT, "close %s failed!", getAppenderName()), e);
+ }
+ LogLog.warn(String.format(Locale.ROOT, "%s closed ...", getAppenderName()));
+ }
+ }
+ }
+
+ private void closeWriter() {
+ IOUtils.closeQuietly(bufferedWriter);
+ IOUtils.closeQuietly(outStream);
+ }
+
+ @Override
+ public boolean requiresLayout() {
+ return true;
+ }
+
+ /**
+ * some times need to wait the component init ok.
+ *
+ * @return
+ */
+ abstract boolean isSkipCheckAndFlushLog();
+
+ /**
+ * clear the log buffer queue when it was full.
+ */
+ private void clearLogBufferQueueWhenBlocked() {
+ if (logBufferQue.size() >= getLogQueueCapacity()) {
+ int removeNum = getLogQueueCapacity() / 5;
+ while (removeNum > 0) {
+ try {
+ LoggingEvent loggingEvent = logBufferQue.take();
+ printLoggingEvent(loggingEvent);
+ } catch (Exception ex) {
+ LogLog.error("Take event interrupted!", ex);
+ }
+ removeNum--;
+ }
+ }
+ }
+
+ /**
+ * print the logging event to stderr
+ * @param loggingEvent
+ */
+ private void printLoggingEvent(LoggingEvent loggingEvent) {
+ try {
+ String log = getLayout().format(loggingEvent);
+ LogLog.error(log.endsWith("\n") ? log.substring(0, log.length() - 1) : log);
+ if (null != loggingEvent.getThrowableStrRep()) {
+ for (String stack : loggingEvent.getThrowableStrRep()) {
+ LogLog.error(stack);
+ }
+ }
+ } catch (Exception e) {
+ LogLog.error("print logging event failed!", e);
+ }
+ }
+
+ /**
+ * flush the log to hdfs when conditions are satisfied.
+ */
+ protected void checkAndFlushLog() {
+ long start = System.currentTimeMillis();
+ do {
+ List<LoggingEvent> transaction = Lists.newArrayList();
+ try {
+ if (isSkipCheckAndFlushLog()) {
+ continue;
+ }
+
+ int eventSize = getLogBufferQue().size();
+ if (eventSize > getLogQueueCapacity() * QUEUE_FLUSH_THRESHOLD
+ || System.currentTimeMillis() - start > getFlushInterval()) {
+ // update start time before doing flushLog to avoid exception when flushLog
+ start = System.currentTimeMillis();
+ flushLog(eventSize, transaction);
+ } else {
+ Thread.sleep(getFlushInterval() / 100);
+ }
+ } catch (Exception e) {
+ transaction.forEach(this::printLoggingEvent);
+ clearLogBufferQueueWhenBlocked();
+ LogLog.error("Error occurred when consume event", e);
+ }
+ } while (!closed);
+ }
+
+ /**
+ * init the hdfs writer and create the hdfs file with outPath.
+ * need kerberos authentic, so fileSystem init here.
+ *
+ * @param outPath
+ */
+ protected boolean initHdfsWriter(Path outPath, Configuration conf) {
+ synchronized (initWriterLock) {
+ closeWriter();
+ bufferedWriter = null;
+ outStream = null;
+
+ int retry = 10;
+ while (retry-- > 0) {
+ try {
+ fileSystem = getFileSystem(conf);
+ outStream = fileSystem.create(outPath, true);
+ break;
+ } catch (Exception e) {
+ LogLog.error("fail to create stream for path: " + outPath, e);
+ }
+
+ try {
+ initWriterLock.wait(1000);//waiting for acl to turn to current user
+ } catch (InterruptedException e) {
+ LogLog.warn("Init writer interrupted!", e);
+ // Restore interrupted state...
+ Thread.currentThread().interrupt();
+ }
+ }
+ if (null != outStream) {
+ bufferedWriter = new BufferedWriter(new OutputStreamWriter(outStream, StandardCharsets.UTF_8));
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * write the data into the buffer.
+ *
+ * @param message
+ * @throws IOException
+ */
+ protected void write(String message) throws IOException {
+ bufferedWriter.write(message);
+ }
+
+ /**
+ * write the error stack info into buffer
+ *
+ * @param loggingEvent
+ * @throws IOException
+ */
+ protected void writeLogEvent(LoggingEvent loggingEvent) throws IOException {
+ if (null != loggingEvent) {
+ write(getLayout().format(loggingEvent));
+
+ if (null != loggingEvent.getThrowableStrRep()) {
+ for (String message : loggingEvent.getThrowableStrRep()) {
+ write(message);
+ write("\n");
+ }
+ }
+ }
+ }
+
+ /**
+ * do write log to the buffer.
+ *
+ * @param eventSize
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ abstract void doWriteLog(int eventSize, List<LoggingEvent> transaction) throws IOException, InterruptedException;
+
+ /**
+ * flush the buffer data to HDFS.
+ *
+ * @throws IOException
+ */
+ private void flush() throws IOException {
+ bufferedWriter.flush();
+ outStream.hsync();
+ }
+
+ /**
+ * take the all events from queue and write into the HDFS immediately.
+ *
+ * @param eventSize
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ protected void flushLog(int eventSize, List<LoggingEvent> transaction) throws IOException, InterruptedException {
+ if (eventSize <= 0) {
+ return;
+ }
+
+ synchronized (flushLogLock) {
+ if (eventSize > getLogBufferQue().size()) {
+ eventSize = getLogBufferQue().size();
+ }
+
+ doWriteLog(eventSize, transaction);
+
+ flush();
+ }
+ }
+}
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java
new file mode 100644
index 0000000..0e5ad3c
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkDriverHdfsLogAppender.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark.common.logging;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SparkDriverHdfsLogAppender extends AbstractHdfsLogAppender {
+
+ private String logPath;
+
+ // kerberos
+ private boolean kerberosEnable = false;
+ private String kerberosPrincipal;
+ private String kerberosKeytab;
+
+ public String getLogPath() {
+ return logPath;
+ }
+
+ public void setLogPath(String logPath) {
+ this.logPath = logPath;
+ }
+
+ public boolean isKerberosEnable() {
+ return kerberosEnable;
+ }
+
+ public void setKerberosEnable(boolean kerberosEnable) {
+ this.kerberosEnable = kerberosEnable;
+ }
+
+ public String getKerberosPrincipal() {
+ return kerberosPrincipal;
+ }
+
+ public void setKerberosPrincipal(String kerberosPrincipal) {
+ this.kerberosPrincipal = kerberosPrincipal;
+ }
+
+ public String getKerberosKeytab() {
+ return kerberosKeytab;
+ }
+
+ public void setKerberosKeytab(String kerberosKeytab) {
+ this.kerberosKeytab = kerberosKeytab;
+ }
+
+ @Override
+ public void init() {
+ LogLog.warn("spark.driver.log4j.appender.hdfs.File -> " + getLogPath());
+ LogLog.warn("kerberosEnable -> " + isKerberosEnable());
+ if (isKerberosEnable()) {
+ LogLog.warn("kerberosPrincipal -> " + getKerberosPrincipal());
+ LogLog.warn("kerberosKeytab -> " + getKerberosKeytab());
+ }
+ }
+
+ @Override
+ String getAppenderName() {
+ return "SparkDriverHdfsLogAppender";
+ }
+
+ @Override
+ public boolean isSkipCheckAndFlushLog() {
+ return false;
+ }
+
+ @Override
+ public void doWriteLog(int eventSize, List<LoggingEvent> transaction)
+ throws IOException, InterruptedException {
+ if (!isWriterInited()) {
+ Configuration conf = new Configuration();
+ if (isKerberosEnable()) {
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation.loginUserFromKeytab(getKerberosPrincipal(), getKerberosKeytab());
+ }
+ if (!initHdfsWriter(new Path(getLogPath()), conf)) {
+ LogLog.error("init the hdfs writer failed!");
+ }
+ }
+
+ while (eventSize > 0) {
+ LoggingEvent loggingEvent = getLogBufferQue().take();
+ transaction.add(loggingEvent);
+ writeLogEvent(loggingEvent);
+ eventSize--;
+ }
+ }
+}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 98f63a1..3cc1b60 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -26,15 +26,14 @@ import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
-
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
-import java.util.Map.Entry;
-
import java.util.Set;
+import java.util.Objects;
+import java.util.Map.Entry;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -108,6 +107,7 @@ public class NSparkExecutable extends AbstractExecutable {
CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = cubeMgr.getCube(this.getCubeName());
KylinConfig config = cube.getConfig();
+ this.setLogPath(getSparkDriverLogHdfsPath(context.getConfig()));
config = wrapConfig(config);
String sparkHome = KylinConfig.getSparkHome();
@@ -189,15 +189,12 @@ public class NSparkExecutable extends AbstractExecutable {
/**
* generate the spark driver log hdfs path format, json path + timestamp + .log
- *
- * @param
- * @return
*/
- /*public String getSparkDriverLogHdfsPath(KylinConfig config) {
- return String.format("%s.%s.log", config.getJobTmpOutputStorePath(getProject(), getId()),
+ public String getSparkDriverLogHdfsPath(KylinConfig config) {
+ return String.format(Locale.ROOT, "%s.%s.log", config.getJobOutputStorePath(getParam(MetadataConstants.P_PROJECT_NAME), getId()),
System.currentTimeMillis());
- }*/
-
+ }
+
protected KylinConfig wrapConfig(ExecutableContext context) {
return wrapConfig(context.getConfig());
}
@@ -206,15 +203,18 @@ public class NSparkExecutable extends AbstractExecutable {
String project = getParam(MetadataConstants.P_PROJECT_NAME);
Preconditions.checkState(StringUtils.isNotBlank(project), "job " + getId() + " project info is empty");
+ HashMap<String, String> jobOverrides = new HashMap();
String parentId = getParentId();
- originalConfig.setProperty("job.id", StringUtils.defaultIfBlank(parentId, getId()));
- originalConfig.setProperty("job.project", project);
+ jobOverrides.put("job.id", StringUtils.defaultIfBlank(parentId, getId()));
+ jobOverrides.put("job.project", project);
if (StringUtils.isNotBlank(parentId)) {
- originalConfig.setProperty("job.stepId", getId());
+ jobOverrides.put("job.stepId", getId());
}
- originalConfig.setProperty("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone());
+ jobOverrides.put("user.timezone", KylinConfig.getInstanceFromEnv().getTimeZone());
+ jobOverrides.put("spark.driver.log4j.appender.hdfs.File",
+ Objects.isNull(this.getLogPath()) ? "null" : this.getLogPath());
- return originalConfig;
+ return KylinConfigExt.createInstance(originalConfig, jobOverrides);
}
private void killOrphanApplicationIfExists(KylinConfig config, String jobId) {
@@ -302,27 +302,64 @@ public class NSparkExecutable extends AbstractExecutable {
sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
}
+ replaceSparkDriverJavaOpsConfIfNeeded(config, sparkConfigOverride);
+ return sparkConfigOverride;
+ }
+
+ private void replaceSparkDriverJavaOpsConfIfNeeded(KylinConfig config, Map<String, String> sparkConfigOverride) {
+ String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions";
+ StringBuilder sb = new StringBuilder();
+ if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) {
+ sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey));
+ }
+
String serverIp = "127.0.0.1";
try {
serverIp = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.warn("use the InetAddress get local ip failed!", e);
}
+ String serverPort = config.getServerPort();
- String log4jConfiguration = "file:" + config.getLogSparkPropertiesFile();
+ String hdfsWorkingDir = config.getHdfsWorkingDirectory();
- String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions";
- StringBuilder sb = new StringBuilder();
- if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) {
- sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey));
+ String sparkDriverHdfsLogPath = null;
+ if (config instanceof KylinConfigExt) {
+ Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
+ if (Objects.nonNull(extendedOverrides)) {
+ sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
+ }
}
+ /*
+ if (config.isCloud()) {
+ String logLocalWorkingDirectory = config.getLogLocalWorkingDirectory();
+ if (StringUtils.isNotBlank(logLocalWorkingDirectory)) {
+ hdfsWorkingDir = logLocalWorkingDirectory;
+ sparkDriverHdfsLogPath = logLocalWorkingDirectory + sparkDriverHdfsLogPath;
+ }
+ }
+ */
+
+ String log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile();
sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration));
+ sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", config.isKerberosEnabled()));
+
+ if (config.isKerberosEnabled()) {
+ sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.principal=%s ", config.getKerberosPrincipal()));
+ sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.keytab=%s", config.getKerberosKeytabPath()));
+ if (config.getPlatformZKEnable()) {
+ sb.append(String.format(Locale.ROOT, " -Djava.security.auth.login.config=%s", config.getKerberosJaasConfPath()));
+ sb.append(String.format(Locale.ROOT, " -Djava.security.krb5.conf=%s", config.getKerberosKrb5ConfPath()));
+ }
+ }
+ sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDir));
+ sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp));
+ sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort));
sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
-
+ sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
sparkConfigOverride.put(sparkDriverExtraJavaOptionsKey, sb.toString());
- return sparkConfigOverride;
}
protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar,
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java
index 8607348..dba8132 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/BasicController.java
@@ -50,6 +50,8 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
+import static org.apache.kylin.shaded.com.google.common.net.HttpHeaders.CONTENT_DISPOSITION;
+
/**
*/
public class BasicController {
@@ -132,6 +134,20 @@ public class BasicController {
}
}
+ protected void setDownloadResponse(InputStream inputStream, String fileName, String contentType,
+ final HttpServletResponse response) throws IOException {
+ try (OutputStream output = response.getOutputStream()) {
+ response.reset();
+ response.setContentType(contentType);
+ response.setHeader(CONTENT_DISPOSITION, "attachment; filename=\"" + fileName + "\"");
+ IOUtils.copyLarge(inputStream, output);
+ output.flush();
+ } catch (IOException e) {
+ logger.error("Failed download log File!");
+ throw e;
+ }
+ }
+
public boolean isAdmin() {
boolean isAdmin = false;
Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 65c036c..c28362c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -18,11 +18,13 @@
package org.apache.kylin.rest.controller;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Locale;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.constant.JobStatusEnum;
@@ -30,16 +32,22 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.request.JobListRequest;
+import org.apache.kylin.rest.response.EnvelopeResponse;
+import org.apache.kylin.rest.response.ResponseCode;
import org.apache.kylin.rest.service.JobService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RequestParam;
+
+import javax.servlet.http.HttpServletResponse;
@Controller
@RequestMapping(value = "jobs")
@@ -160,6 +168,7 @@ public class JobController extends BasicController {
* @return
* @throws IOException
*/
+
@RequestMapping(value = "/{jobId}/steps/{stepId}/output", method = { RequestMethod.GET }, produces = {
"application/json" })
@ResponseBody
@@ -167,11 +176,34 @@ public class JobController extends BasicController {
Map<String, String> result = new HashMap<String, String>();
result.put("jobId", jobId);
result.put("stepId", String.valueOf(stepId));
- result.put("cmd_output", jobService.getExecutableManager().getOutput(stepId).getVerboseMsg());
+ result.put("cmd_output", jobService.getJobOutput(jobId, stepId));
return result;
}
/**
+ * Download a job step output from hdfs
+ * @param jobId
+ * @param stepId
+ * @param project
+ * @param response
+ * @return
+ */
+ @RequestMapping(value = "/{job_id:.+}/steps/{step_id:.+}/log", method = { RequestMethod.GET }, produces = { "application/json" })
+ @ResponseBody
+ public EnvelopeResponse<String> downloadLogFile(@PathVariable("job_id") String jobId,
+ @PathVariable("step_id") String stepId, @RequestParam(value = "project") String project,
+ HttpServletResponse response) throws IOException {
+ checkRequiredArg("job_id", jobId);
+ checkRequiredArg("step_id", stepId);
+ checkRequiredArg("project", project);
+ String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", project, stepId);
+
+ String jobOutput = jobService.getAllJobOutput(jobId, stepId);
+ setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes("UTF-8")), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response);
+ return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", "");
+ }
+
+ /**
* Resume a cube job
*
* @return
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index f7be7d1..2c0c20c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -64,6 +64,7 @@ import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.CheckpointExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.Output;
import org.apache.kylin.job.lock.zookeeper.ZookeeperJobLock;
import org.apache.kylin.metadata.model.ISourceAware;
@@ -479,6 +480,16 @@ public class JobService extends BasicService implements InitializingBean {
return getExecutableManager().getOutput(id);
}
+ public String getJobOutput(String jobId, String stepId) {
+ ExecutableManager executableManager = getExecutableManager();
+ return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg();
+ }
+
+ public String getAllJobOutput(String jobId, String stepId) {
+ ExecutableManager executableManager = getExecutableManager();
+ return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg();
+ }
+
protected JobInstance getSingleJobInstance(AbstractExecutable job) {
Message msg = MsgPicker.getMsg();