You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/11/27 07:22:50 UTC
[kylin] 03/06: KYLIN-4813 Fix several bugs for executor-side log
collection
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 86aae4dc1cd6f364c36e9f47db84b20956b2b911
Author: XiaoxiangYu <xx...@apache.org>
AuthorDate: Sun Nov 22 22:04:30 2020 +0800
KYLIN-4813 Fix several bugs for executor-side log collection
- add --files to upload user-defined log4j properties
- fix ClassName error
- fix executor.extraJavaOptions be overwrote in Driver side
- fix security issue (HTTP response splitting)
- code style etc.
---
build/conf/spark-executor-log4j.properties | 4 +-
.../org/apache/kylin/common/KylinConfigBase.java | 6 +-
.../src/main/resources/kylin-defaults.properties | 6 +-
.../kylin/job/execution/ExecutableManager.java | 26 +++----
.../common/logging/SparkExecutorHdfsAppender.java | 24 +++++-
.../engine/spark/application/SparkApplication.java | 9 ++-
.../kylin/engine/spark/job/NSparkExecutable.java | 87 +++++++++++++---------
.../scala/org/apache/spark/sql/KylinSession.scala | 4 +-
.../kylin/rest/controller/JobController.java | 25 +++----
.../org/apache/kylin/rest/service/JobService.java | 4 +-
10 files changed, 110 insertions(+), 85 deletions(-)
diff --git a/build/conf/spark-executor-log4j.properties b/build/conf/spark-executor-log4j.properties
index 7cc5b04..fb5b7e3 100644
--- a/build/conf/spark-executor-log4j.properties
+++ b/build/conf/spark-executor-log4j.properties
@@ -27,7 +27,7 @@ log4j.appender.stderr.target=System.err
log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
-log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsLogAppender
+log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsAppender
log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir}
log4j.appender.hdfs.metadataIdentifier=${kylin.metadata.identifier}
@@ -41,6 +41,6 @@ log4j.appender.hdfs.logQueueCapacity=5000
#flushPeriod count as millis
log4j.appender.hdfs.flushInterval=5000
-log4j.appender.hdfs.layout=org.apache.kylin.engine.spark.common.logging.SensitivePatternLayout
+log4j.appender.hdfs.layout=org.apache.kylin.common.logging.SensitivePatternLayout
#Don't add line number (%L) as it's too costly!
log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e2727fa..01dc374 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2863,8 +2863,10 @@ public abstract class KylinConfigBase implements Serializable {
return getOptional("kylin.query.intersect.separator", "|");
}
- @ConfigTag(ConfigTag.Tag.NOT_CLEAR)
- public String sparderFiles() {
+ /**
+ * Used to upload user-defined log4j configuration
+ */
+ public String sparkUploadFiles() {
try {
File storageFile = FileUtils.findFile(KylinConfigBase.getKylinHome() + "/conf",
"spark-executor-log4j.properties");
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 8fba3bc..858278f 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -259,7 +259,7 @@ kylin.engine.spark-conf.spark.eventLog.enabled=true
kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
-kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=job -Dkylin.spark.project=${job.project} -Dkylin.spark.identifier=${job.id} -Dkylin.spark.jobName=${job.stepId} -Duser.timezone=${user.timezone}
+kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.spark.category=job
#kylin.engine.spark-conf.spark.sql.shuffle.partitions=1
# manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
@@ -269,7 +269,6 @@ kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -D
# uncomment for HDP
#kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
#kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
-#kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
### SPARK QUERY ENGINE CONFIGS (a.k.a. Sparder Context) ###
# Enlarge cores and memory to improve query performance in production env, please check https://cwiki.apache.org/confluence/display/KYLIN/User+Manual+4.X
@@ -287,11 +286,10 @@ kylin.query.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializ
#kylin.query.spark-conf.spark.sql.shuffle.partitions=40
#kylin.query.spark-conf.spark.yarn.jars=hdfs://localhost:9000/spark2_jars/*
-kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project} -XX:MaxDirectMemorySize=896M
+kylin.query.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project}
# uncomment for HDP
#kylin.query.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
#kylin.query.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
-#kylin.query.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current
### QUERY PUSH DOWN ###
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 590276d..42a9c99 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.job.execution;
@@ -255,10 +255,6 @@ public class ExecutableManager {
return result;
}
- public Output getOutputFromHDFSByJobId(String jobId) {
- return getOutputFromHDFSByJobId(jobId, jobId);
- }
-
public Output getOutputFromHDFSByJobId(String jobId, String stepId) {
return getOutputFromHDFSByJobId(jobId, stepId, 100);
}
@@ -593,7 +589,7 @@ public class ExecutableManager {
logger.warn("The job " + jobId + " has been discarded.");
}
throw new IllegalStateException(
- "The job " + job.getId() + " has already been finished and cannot be discarded.");
+ "The job " + job.getId() + " has already been finished and cannot be discarded.");
}
if (job instanceof DefaultChainedExecutable) {
List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
@@ -617,7 +613,7 @@ public class ExecutableManager {
for (AbstractExecutable task : tasks) {
if (task.getId().compareTo(stepId) >= 0) {
logger.debug("rollback task : " + task);
- updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", task.getLogPath());
+ updateJobOutput(task.getParam(MetadataConstants.P_PROJECT_NAME), task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), "", task.getLogPath());
}
}
}
@@ -634,11 +630,11 @@ public class ExecutableManager {
}
if (!(job.getStatus() == ExecutableState.READY
- || job.getStatus() == ExecutableState.RUNNING)) {
+ || job.getStatus() == ExecutableState.RUNNING)) {
logger.warn("The status of job " + jobId + " is " + job.getStatus().toString()
- + ". It's final state and cannot be transfer to be stopped!!!");
+ + ". It's final state and cannot be transfer to be stopped!!!");
throw new IllegalStateException(
- "The job " + job.getId() + " has already been finished and cannot be stopped.");
+ "The job " + job.getId() + " has already been finished and cannot be stopped.");
}
if (job instanceof DefaultChainedExecutable) {
List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
@@ -662,7 +658,6 @@ public class ExecutableManager {
}
public void updateJobOutput(String project, String jobId, ExecutableState newStatus, Map<String, String> info, String output, String logPath) {
- // when
if (Thread.currentThread().isInterrupted()) {
throw new RuntimeException("Current thread is interruptted, aborting");
}
@@ -701,7 +696,6 @@ public class ExecutableManager {
}
if (project != null) {
- //write output to HDFS
updateJobOutputToHDFS(project, jobId, output, logPath);
}
}
@@ -715,7 +709,7 @@ public class ExecutableManager {
jobOutput.setLogPath(logPath);
}
String outputHDFSPath = KylinConfig.getInstanceFromEnv().getJobOutputStorePath(project, jobId);
-
+ logger.debug("Update JobOutput To HDFS for {} to {} [{}]", jobId, outputHDFSPath, jobOutput.getContent() != null ? jobOutput.getContent().length() : -1);
updateJobOutputToHDFS(outputHDFSPath, jobOutput);
}
@@ -786,7 +780,7 @@ public class ExecutableManager {
if (executableDao.getJobOutput(task.getId()).getStatus().equals("SUCCEED")) {
continue;
} else if (executableDao.getJobOutput(task.getId()).getStatus().equals("RUNNING")) {
- updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String> newHashMap(), "", null);
+ updateJobOutput(null, task.getId(), ExecutableState.READY, Maps.<String, String>newHashMap(), "", null);
}
break;
}
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
index 4a76022..381fd2d 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.kylin.engine.spark.common.logging;
import com.google.common.annotations.VisibleForTesting;
@@ -111,7 +129,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
@Override
String getAppenderName() {
- return "SparkExecutorHdfsLogAppender";
+ return "SparkExecutorHdfsAppender";
}
@Override
@@ -164,6 +182,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
String rollingDir = dateFormat.format(new Date(event.getTimeStamp()));
outPutPath = getOutPutDir(rollingDir);
}
+ LogLog.warn("Update to " + outPutPath);
}
private String getOutPutDir(String rollingDir) {
@@ -236,8 +255,7 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
}
private String parseHdfsWordingDir() {
- return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/")
- + StringUtils.replace(getMetadataIdentifier(), "/", "-");
+ return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/");
}
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 84737cb..4ec461a 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -135,8 +135,13 @@ public abstract class SparkApplication {
}
if (!config.getSparkConfigOverride().isEmpty()) {
for (Map.Entry<String, String> entry : config.getSparkConfigOverride().entrySet()) {
- logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue());
- sparkConf.set(entry.getKey(), entry.getValue());
+ if (entry.getKey().contains("spark.executor.extraJavaOptions")) {
+ // Just let NSparkExecutable#replaceSparkNodeJavaOpsConfIfNeeded(in JobServer) to determine executor's JVM level configuration
+ logger.info("Do not override {}={}.", entry.getKey(), entry.getValue());
+ } else {
+ logger.info("Override user-defined spark conf, set {}={}.", entry.getKey(), entry.getValue());
+ sparkConf.set(entry.getKey(), entry.getValue());
+ }
}
}
} else if (!isJobOnCluster(sparkConf)) {
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 3cc1b60..987c215 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -122,8 +122,8 @@ public class NSparkExecutable extends AbstractExecutable {
String hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
logger.info("write hadoop conf is {} ", config.getBuildConf());
if (!config.getBuildConf().isEmpty()) {
- logger.info("write hadoop conf is {} ", config.getBuildConf());
- hadoopConf = config.getBuildConf();
+ logger.info("write hadoop conf is {} ", config.getBuildConf());
+ hadoopConf = config.getBuildConf();
}
if (StringUtils.isEmpty(hadoopConf) && !config.isUTEnv() && !config.isZKLocal()) {
throw new RuntimeException(
@@ -203,7 +203,7 @@ public class NSparkExecutable extends AbstractExecutable {
String project = getParam(MetadataConstants.P_PROJECT_NAME);
Preconditions.checkState(StringUtils.isNotBlank(project), "job " + getId() + " project info is empty");
- HashMap<String, String> jobOverrides = new HashMap();
+ HashMap<String, String> jobOverrides = new HashMap<>();
String parentId = getParentId();
jobOverrides.put("job.id", StringUtils.defaultIfBlank(parentId, getId()));
jobOverrides.put("job.project", project);
@@ -250,7 +250,7 @@ public class NSparkExecutable extends AbstractExecutable {
}
private ExecuteResult runSparkSubmit(KylinConfig config, String hadoopConf, String jars,
- String kylinJobJar, String appArgs, String jobId) {
+ String kylinJobJar, String appArgs, String jobId) {
PatternedLogger patternedLogger;
if (config.isJobLogPrintEnabled()) {
patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
@@ -302,16 +302,21 @@ public class NSparkExecutable extends AbstractExecutable {
sparkConfigOverride.put("spark.hadoop.hive.metastore.sasl.enabled", "true");
}
- replaceSparkDriverJavaOpsConfIfNeeded(config, sparkConfigOverride);
+ replaceSparkNodeJavaOpsConfIfNeeded(true, config, sparkConfigOverride);
+ replaceSparkNodeJavaOpsConfIfNeeded(false, config, sparkConfigOverride);
return sparkConfigOverride;
}
- private void replaceSparkDriverJavaOpsConfIfNeeded(KylinConfig config, Map<String, String> sparkConfigOverride) {
- String sparkDriverExtraJavaOptionsKey = "spark.driver.extraJavaOptions";
+ /**
+ * Add property in spark.xxx.extraJavaOptions for AbstractHdfsLogAppender
+ * Please check following for detail.
+ * 1. conf/spark-driver-log4j.properties and conf/spark-executor-log4j.properties
+ * 2. AbstractHdfsLogAppender
+ */
+ private void replaceSparkNodeJavaOpsConfIfNeeded(boolean isDriver, KylinConfig config,
+ Map<String, String> sparkConfigOverride) {
+ String sparkNodeExtraJavaOptionsKey = isDriver ? "spark.driver.extraJavaOptions" : "spark.executor.extraJavaOptions";
StringBuilder sb = new StringBuilder();
- if (sparkConfigOverride.containsKey(sparkDriverExtraJavaOptionsKey)) {
- sb.append(sparkConfigOverride.get(sparkDriverExtraJavaOptionsKey));
- }
String serverIp = "127.0.0.1";
try {
@@ -320,31 +325,37 @@ public class NSparkExecutable extends AbstractExecutable {
logger.warn("use the InetAddress get local ip failed!", e);
}
String serverPort = config.getServerPort();
-
String hdfsWorkingDir = config.getHdfsWorkingDirectory();
-
- String sparkDriverHdfsLogPath = null;
- if (config instanceof KylinConfigExt) {
- Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
- if (Objects.nonNull(extendedOverrides)) {
- sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
+ String log4jConfiguration;
+
+ if (isDriver) {
+ String sparkDriverHdfsLogPath;
+ if (config instanceof KylinConfigExt) {
+ Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
+ if (Objects.nonNull(extendedOverrides)) {
+ sparkDriverHdfsLogPath = extendedOverrides.get("spark.driver.log4j.appender.hdfs.File");
+ sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
+ }
}
- }
-
- /*
- if (config.isCloud()) {
- String logLocalWorkingDirectory = config.getLogLocalWorkingDirectory();
- if (StringUtils.isNotBlank(logLocalWorkingDirectory)) {
- hdfsWorkingDir = logLocalWorkingDirectory;
- sparkDriverHdfsLogPath = logLocalWorkingDirectory + sparkDriverHdfsLogPath;
+ log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile();
+ sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration));
+ sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp));
+ sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort));
+ sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
+ sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
+ } else {
+ if (config instanceof KylinConfigExt) {
+ Map<String, String> extendedOverrides = ((KylinConfigExt) config).getExtendedOverrides();
+ if (Objects.nonNull(extendedOverrides)) {
+ sb.append(String.format(Locale.ROOT, " -Dkylin.spark.project=%s ", extendedOverrides.get("job.project")));
+ sb.append(String.format(Locale.ROOT, " -Dkylin.spark.jobName=%s ", extendedOverrides.get("job.stepId")));
+ sb.append(String.format(Locale.ROOT, " -Dkylin.spark.identifier=%s ", extendedOverrides.get("job.id")));
+ }
}
+ sb.append(String.format(Locale.ROOT, " -Dkylin.metadata.identifier=%s ", config.getMetadataUrlPrefix()));
}
- */
- String log4jConfiguration = "file:" + config.getLogSparkDriverPropertiesFile();
- sb.append(String.format(Locale.ROOT, " -Dlog4j.configuration=%s ", log4jConfiguration));
sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.enabled=%s ", config.isKerberosEnabled()));
-
if (config.isKerberosEnabled()) {
sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.principal=%s ", config.getKerberosPrincipal()));
sb.append(String.format(Locale.ROOT, " -Dkylin.kerberos.keytab=%s", config.getKerberosKeytabPath()));
@@ -354,16 +365,17 @@ public class NSparkExecutable extends AbstractExecutable {
}
}
sb.append(String.format(Locale.ROOT, " -Dkylin.hdfs.working.dir=%s ", hdfsWorkingDir));
- sb.append(String.format(Locale.ROOT, " -Dspark.driver.log4j.appender.hdfs.File=%s ", sparkDriverHdfsLogPath));
- sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.ip=%s ", serverIp));
- sb.append(String.format(Locale.ROOT, " -Dspark.driver.rest.server.port=%s ", serverPort));
- sb.append(String.format(Locale.ROOT, " -Dspark.driver.param.taskId=%s ", getId()));
- sb.append(String.format(Locale.ROOT, " -Dspark.driver.local.logDir=%s ", config.getKylinLogDir() + "/spark"));
- sparkConfigOverride.put(sparkDriverExtraJavaOptionsKey, sb.toString());
+
+ if (sparkConfigOverride.containsKey(sparkNodeExtraJavaOptionsKey)) {
+ sb.append(" ").append(sparkConfigOverride.get(sparkNodeExtraJavaOptionsKey));
+ }
+ logger.debug("Final property is set to <<{}>> for {} .", sb.toString(), sparkNodeExtraJavaOptionsKey);
+ // Here replace original Java options for Spark node
+ sparkConfigOverride.put(sparkNodeExtraJavaOptionsKey, sb.toString());
}
protected String generateSparkCmd(KylinConfig config, String hadoopConf, String jars, String kylinJobJar,
- String appArgs) {
+ String appArgs) {
StringBuilder sb = new StringBuilder();
sb.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class org.apache.kylin.engine.spark.application.SparkEntry ");
@@ -377,11 +389,12 @@ public class NSparkExecutable extends AbstractExecutable {
if (sparkConfs.containsKey("spark.sql.hive.metastore.jars")) {
jars = jars + "," + sparkConfs.get("spark.sql.hive.metastore.jars");
}
-
+ sb.append("--files ").append(config.sparkUploadFiles()).append(" ");
sb.append("--name job_step_%s ");
sb.append("--jars %s %s %s");
String cmd = String.format(Locale.ROOT, sb.toString(), hadoopConf, KylinConfig.getSparkHome(), getId(), jars, kylinJobJar,
appArgs);
+ // SparkConf still have a change to be changed in CubeBuildJob.java (Spark Driver)
logger.info("spark submit cmd: {}", cmd);
return cmd;
}
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
index fb09d38..7ae0937 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/KylinSession.scala
@@ -174,10 +174,10 @@ object KylinSession extends Logging {
if (sparkConf.get("spark.master").startsWith("yarn")) {
sparkConf.set("spark.yarn.dist.jars",
KylinConfig.getInstanceFromEnv.getKylinParquetJobJarPath)
- sparkConf.set("spark.yarn.dist.files", conf.sparderFiles())
+ sparkConf.set("spark.yarn.dist.files", conf.sparkUploadFiles())
} else {
sparkConf.set("spark.jars", conf.sparderJars)
- sparkConf.set("spark.files", conf.sparderFiles())
+ sparkConf.set("spark.files", conf.sparkUploadFiles())
}
val fileName = KylinConfig.getInstanceFromEnv.getKylinParquetJobJarPath
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index c28362c..cd63ac7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -20,12 +20,14 @@ package org.apache.kylin.rest.controller;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Locale;
+import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobTimeFilterEnum;
@@ -164,29 +166,20 @@ public class JobController extends BasicController {
/**
* Get a job step output
- *
- * @return
- * @throws IOException
*/
-
@RequestMapping(value = "/{jobId}/steps/{stepId}/output", method = { RequestMethod.GET }, produces = {
"application/json" })
@ResponseBody
public Map<String, String> getStepOutput(@PathVariable String jobId, @PathVariable String stepId) {
- Map<String, String> result = new HashMap<String, String>();
+ Map<String, String> result = new HashMap<>();
result.put("jobId", jobId);
result.put("stepId", String.valueOf(stepId));
- result.put("cmd_output", jobService.getJobOutput(jobId, stepId));
+ result.put("cmd_output", jobService.getJobStepOutput(jobId, stepId));
return result;
}
/**
- * Download a job step output from hdfs
- * @param jobId
- * @param stepId
- * @param project
- * @param response
- * @return
+ * Download a step output(Spark driver log) from hdfs
*/
@RequestMapping(value = "/{job_id:.+}/steps/{step_id:.+}/log", method = { RequestMethod.GET }, produces = { "application/json" })
@ResponseBody
@@ -196,10 +189,12 @@ public class JobController extends BasicController {
checkRequiredArg("job_id", jobId);
checkRequiredArg("step_id", stepId);
checkRequiredArg("project", project);
- String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", project, stepId);
+ String validatedPrj = CliCommandExecutor.checkParameter(project);
+ String validatedStepId = CliCommandExecutor.checkParameter(stepId);
+ String downloadFilename = String.format(Locale.ROOT, "%s_%s.log", validatedPrj, validatedStepId);
- String jobOutput = jobService.getAllJobOutput(jobId, stepId);
- setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes("UTF-8")), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response);
+ String jobOutput = jobService.getAllJobStepOutput(jobId, stepId);
+ setDownloadResponse(new ByteArrayInputStream(jobOutput.getBytes(StandardCharsets.UTF_8)), downloadFilename, MediaType.APPLICATION_OCTET_STREAM_VALUE, response);
return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", "");
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 2c0c20c..90ee782 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -480,12 +480,12 @@ public class JobService extends BasicService implements InitializingBean {
return getExecutableManager().getOutput(id);
}
- public String getJobOutput(String jobId, String stepId) {
+ public String getJobStepOutput(String jobId, String stepId) {
ExecutableManager executableManager = getExecutableManager();
return executableManager.getOutputFromHDFSByJobId(jobId, stepId).getVerboseMsg();
}
- public String getAllJobOutput(String jobId, String stepId) {
+ public String getAllJobStepOutput(String jobId, String stepId) {
ExecutableManager executableManager = getExecutableManager();
return executableManager.getOutputFromHDFSByJobId(jobId, stepId, Integer.MAX_VALUE).getVerboseMsg();
}