You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/11/27 07:22:49 UTC
[kylin] 02/06: KYLIN-4813 Add spark executor log4j
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c0b27873f7ff6cfcaa4d404214c41288bd81a4f4
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Thu Nov 19 10:24:18 2020 +0800
KYLIN-4813 Add spark executor log4j
KYLIN-4813 Minor fix
---
build/conf/spark-executor-log4j.properties | 46 ++++
.../org/apache/kylin/common/KylinConfigBase.java | 6 +
.../src/main/resources/kylin-defaults.properties | 2 +
.../kylin/job/execution/AbstractExecutable.java | 2 +-
.../kylin/job/execution/ExecutableManager.java | 1 -
.../common/logging/SparkExecutorHdfsAppender.java | 243 +++++++++++++++++++++
6 files changed, 298 insertions(+), 2 deletions(-)
diff --git a/build/conf/spark-executor-log4j.properties b/build/conf/spark-executor-log4j.properties
new file mode 100644
index 0000000..7cc5b04
--- /dev/null
+++ b/build/conf/spark-executor-log4j.properties
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# It's called spark-executor-log4j.properties so that it won't distract users from the other more important log4j config file: kylin-server-log4j.properties
+# enable this by -Dlog4j.configuration=spark-executor-log4j.properties
+log4j.rootLogger=INFO,stderr,hdfs
+
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.layout=org.apache.kylin.common.logging.SensitivePatternLayout
+log4j.appender.stderr.target=System.err
+#Don't add line number (%L) as it's too costly!
+log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
+
+
+log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsLogAppender
+
+log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir}
+log4j.appender.hdfs.metadataIdentifier=${kylin.metadata.identifier}
+log4j.appender.hdfs.category=${kylin.spark.category}
+log4j.appender.hdfs.identifier=${kylin.spark.identifier}
+log4j.appender.hdfs.jobName=${kylin.spark.jobName}
+log4j.appender.hdfs.project=${kylin.spark.project}
+
+log4j.appender.hdfs.rollingPeriod=5
+log4j.appender.hdfs.logQueueCapacity=5000
+#flushPeriod count as millis
+log4j.appender.hdfs.flushInterval=5000
+
+log4j.appender.hdfs.layout=org.apache.kylin.engine.spark.common.logging.SensitivePatternLayout
+#Don't add line number (%L) as it's too costly!
+log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 081d23f..e2727fa 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -260,6 +260,7 @@ public abstract class KylinConfigBase implements Serializable {
this.properties = BCC.check(properties);
setProperty("kylin.metadata.url.identifier", getMetadataUrlPrefix());
setProperty("kylin.log.spark-driver-properties-file", getLogSparkDriverPropertiesFile());
+ setProperty("kylin.log.spark-executor-properties-file", getLogSparkExecutorPropertiesFile());
}
private Map<Integer, String> convertKeyToInteger(Map<String, String> map) {
@@ -2829,6 +2830,11 @@ public abstract class KylinConfigBase implements Serializable {
return getLogPropertyFile("spark-driver-log4j.properties");
}
+ public String getLogSparkExecutorPropertiesFile() {
+ return getLogPropertyFile("spark-executor-log4j.properties");
+ }
+
+
private String getLogPropertyFile(String filename) {
if (isDevEnv()) {
return Paths.get(getKylinHomeWithoutWarn(), "build", "conf").toString() + File.separator + filename;
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 3d8268f..8fba3bc 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -259,6 +259,7 @@ kylin.engine.spark-conf.spark.eventLog.enabled=true
kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
+kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=job -Dkylin.spark.project=${job.project} -Dkylin.spark.identifier=${job.id} -Dkylin.spark.jobName=${job.stepId} -Duser.timezone=${user.timezone}
#kylin.engine.spark-conf.spark.sql.shuffle.partitions=1
# manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
@@ -286,6 +287,7 @@ kylin.query.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializ
#kylin.query.spark-conf.spark.sql.shuffle.partitions=40
#kylin.query.spark-conf.spark.yarn.jars=hdfs://localhost:9000/spark2_jars/*
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project} -XX:MaxDirectMemorySize=896M
# uncomment for HDP
#kylin.query.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
#kylin.query.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index d815e5b..a2fe7e5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -596,7 +596,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
public final String getProject() {
if (project == null) {
- logger.error("project is not set for abstract executable " + getId());
+ throw new IllegalStateException("project is not set for abstract executable " + getId());
}
return project;
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index c9ae8a4..590276d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -382,7 +382,6 @@ public class ExecutableManager {
/**
* get the last N_LINES lines from the end of hdfs file input stream;
- * reference: https://olapio.atlassian.net/wiki/spaces/PD/pages/1306918958
*
* @param hdfsDin
* @param startPos
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
new file mode 100644
index 0000000..4a76022
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -0,0 +1,243 @@
+package org.apache.kylin.engine.spark.common.logging;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+import java.util.UUID;
+
+public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
+
+ private static final long A_DAY_MILLIS = 24 * 60 * 60 * 1000L;
+ private static final long A_HOUR_MILLIS = 60 * 60 * 1000L;
+ private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.getDefault());
+ private SimpleDateFormat hourFormat = new SimpleDateFormat("HH");
+
+ @VisibleForTesting
+ String outPutPath;
+ @VisibleForTesting
+ String executorId;
+
+ @VisibleForTesting
+ long startTime = 0;
+ @VisibleForTesting
+ boolean rollingByHour = false;
+ @VisibleForTesting
+ int rollingPeriod = 5;
+
+ //log appender configurable
+ private String metadataIdentifier;
+ private String category;
+
+ private String identifier;
+
+ // only cubing job
+ private String jobName;
+ private String project;
+
+ public String getProject() {
+ return project;
+ }
+
+ public void setProject(String project) {
+ this.project = project;
+ }
+
+ public String getJobName() {
+ return jobName;
+ }
+
+ public void setJobName(String jobName) {
+ this.jobName = jobName;
+ }
+
+ public void setIdentifier(String identifier) {
+ this.identifier = identifier;
+ }
+
+ public String getIdentifier() {
+ return identifier;
+ }
+
+ public void setCategory(String category) {
+ this.category = category;
+ }
+
+ public String getCategory() {
+ return category;
+ }
+
+ public void setMetadataIdentifier(String metadataIdentifier) {
+ this.metadataIdentifier = metadataIdentifier;
+ }
+
+ public String getMetadataIdentifier() {
+ return metadataIdentifier;
+ }
+
+ @Override
+ void init() {
+ if (StringUtils.isBlank(this.identifier)) {
+ this.identifier = YarnSparkHadoopUtil.getContainerId().getApplicationAttemptId().getApplicationId()
+ .toString();
+ }
+
+ LogLog.warn("metadataIdentifier -> " + getMetadataIdentifier());
+ LogLog.warn("category -> " + getCategory());
+ LogLog.warn("identifier -> " + getIdentifier());
+
+ if (null != getProject()) {
+ LogLog.warn("project -> " + getProject());
+ }
+
+ if (null != getJobName()) {
+ LogLog.warn("jobName -> " + getJobName());
+ }
+ }
+
+ @Override
+ String getAppenderName() {
+ return "SparkExecutorHdfsLogAppender";
+ }
+
+ @Override
+ boolean isSkipCheckAndFlushLog() {
+ if (SparkEnv.get() == null && StringUtils.isBlank(executorId)) {
+ LogLog.warn("Waiting for spark executor to start");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LogLog.error("Waiting for spark executor starting is interrupted!", e);
+ Thread.currentThread().interrupt();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ void doWriteLog(int size, List<LoggingEvent> transaction) throws IOException, InterruptedException {
+ while (size > 0) {
+ final LoggingEvent loggingEvent = getLogBufferQue().take();
+ if (isTimeChanged(loggingEvent)) {
+ updateOutPutDir(loggingEvent);
+
+ final Path file = new Path(outPutPath);
+
+ String sparkuser = System.getenv("SPARK_USER");
+ String user = System.getenv("USER");
+ LogLog.warn("login user is " + UserGroupInformation.getLoginUser() + " SPARK_USER is " + sparkuser
+ + " USER is " + user);
+ if (!initHdfsWriter(file, new Configuration())) {
+ LogLog.error("Failed to init the hdfs writer!");
+ }
+ doRollingClean(loggingEvent);
+ }
+
+ transaction.add(loggingEvent);
+ writeLogEvent(loggingEvent);
+ size--;
+ }
+ }
+
+ @VisibleForTesting
+ void updateOutPutDir(LoggingEvent event) {
+ if (rollingByHour) {
+ String rollingDir = dateFormat.format(new Date(event.getTimeStamp())) + "/"
+ + hourFormat.format(new Date(event.getTimeStamp()));
+ outPutPath = getOutPutDir(rollingDir);
+ } else {
+ String rollingDir = dateFormat.format(new Date(event.getTimeStamp()));
+ outPutPath = getOutPutDir(rollingDir);
+ }
+ }
+
+ private String getOutPutDir(String rollingDir) {
+ if (StringUtils.isBlank(executorId)) {
+ executorId = SparkEnv.get() != null ? SparkEnv.get().executorId() : UUID.randomUUID().toString();
+ LogLog.warn("executorId set to " + executorId);
+ }
+
+ if ("job".equals(getCategory())) {
+ return getRootPathName() + "/" + rollingDir + "/" + getIdentifier() + "/" + getJobName() + "/" + "executor-"
+ + executorId + ".log";
+ }
+ return getRootPathName() + "/" + rollingDir + "/" + getIdentifier() + "/" + "executor-" + executorId + ".log";
+ }
+
+ @VisibleForTesting
+ void doRollingClean(LoggingEvent event) throws IOException {
+ FileSystem fileSystem = getFileSystem();
+
+ String rootPathName = getRootPathName();
+ Path rootPath = new Path(rootPathName);
+
+ if (!fileSystem.exists(rootPath))
+ return;
+
+ FileStatus[] logFolders = fileSystem.listStatus(rootPath);
+
+ if (logFolders == null)
+ return;
+
+ String thresholdDay = dateFormat.format(new Date(event.getTimeStamp() - A_DAY_MILLIS * rollingPeriod));
+
+ for (FileStatus fs : logFolders) {
+ String fileName = fs.getPath().getName();
+ if (fileName.compareTo(thresholdDay) < 0) {
+ Path fullPath = new Path(rootPathName + File.separator + fileName);
+ if (!fileSystem.exists(fullPath))
+ continue;
+ fileSystem.delete(fullPath, true);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ String getRootPathName() {
+ if ("job".equals(getCategory())) {
+ return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs";
+ } else if ("sparder".equals(getCategory())) {
+ return parseHdfsWordingDir() + "/_sparder_logs";
+ } else {
+ throw new IllegalArgumentException("illegal category: " + getCategory());
+ }
+ }
+
+ @VisibleForTesting
+ boolean isTimeChanged(LoggingEvent event) {
+ if (rollingByHour) {
+ return isNeedRolling(event, A_HOUR_MILLIS);
+ } else {
+ return isNeedRolling(event, A_DAY_MILLIS);
+ }
+ }
+
+ private boolean isNeedRolling(LoggingEvent event, Long timeInterval) {
+ if (0 == startTime || ((event.getTimeStamp() / timeInterval) - (startTime / timeInterval)) > 0) {
+ startTime = event.getTimeStamp();
+ return true;
+ }
+ return false;
+ }
+
+ private String parseHdfsWordingDir() {
+ return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/")
+ + StringUtils.replace(getMetadataIdentifier(), "/", "-");
+ }
+}
+