You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/11/27 07:22:49 UTC

[kylin] 02/06: KYLIN-4813 Add spark executor log4j

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c0b27873f7ff6cfcaa4d404214c41288bd81a4f4
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Thu Nov 19 10:24:18 2020 +0800

    KYLIN-4813 Add spark executor log4j
    
    KYLIN-4813 Minor fix
---
 build/conf/spark-executor-log4j.properties         |  46 ++++
 .../org/apache/kylin/common/KylinConfigBase.java   |   6 +
 .../src/main/resources/kylin-defaults.properties   |   2 +
 .../kylin/job/execution/AbstractExecutable.java    |   2 +-
 .../kylin/job/execution/ExecutableManager.java     |   1 -
 .../common/logging/SparkExecutorHdfsAppender.java  | 243 +++++++++++++++++++++
 6 files changed, 298 insertions(+), 2 deletions(-)

diff --git a/build/conf/spark-executor-log4j.properties b/build/conf/spark-executor-log4j.properties
new file mode 100644
index 0000000..7cc5b04
--- /dev/null
+++ b/build/conf/spark-executor-log4j.properties
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# It's called spark-executor-log4j.properties so that it won't distract users from the other more important log4j config file: kylin-server-log4j.properties
+# enable this by -Dlog4j.configuration=spark-executor-log4j.properties
+log4j.rootLogger=INFO,stderr,hdfs
+
+log4j.appender.stderr=org.apache.log4j.ConsoleAppender
+log4j.appender.stderr.layout=org.apache.kylin.common.logging.SensitivePatternLayout
+log4j.appender.stderr.target=System.err
+#Don't add line number (%L) as it's too costly!
+log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
+
+
+log4j.appender.hdfs=org.apache.kylin.engine.spark.common.logging.SparkExecutorHdfsLogAppender
+
+log4j.appender.hdfs.hdfsWorkingDir=${kylin.hdfs.working.dir}
+log4j.appender.hdfs.metadataIdentifier=${kylin.metadata.identifier}
+log4j.appender.hdfs.category=${kylin.spark.category}
+log4j.appender.hdfs.identifier=${kylin.spark.identifier}
+log4j.appender.hdfs.jobName=${kylin.spark.jobName}
+log4j.appender.hdfs.project=${kylin.spark.project}
+
+log4j.appender.hdfs.rollingPeriod=5
+log4j.appender.hdfs.logQueueCapacity=5000
+#flushPeriod count as millis
+log4j.appender.hdfs.flushInterval=5000
+
+log4j.appender.hdfs.layout=org.apache.kylin.engine.spark.common.logging.SensitivePatternLayout
+#Don't add line number (%L) as it's too costly!
+log4j.appender.hdfs.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 081d23f..e2727fa 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -260,6 +260,7 @@ public abstract class KylinConfigBase implements Serializable {
         this.properties = BCC.check(properties);
         setProperty("kylin.metadata.url.identifier", getMetadataUrlPrefix());
         setProperty("kylin.log.spark-driver-properties-file", getLogSparkDriverPropertiesFile());
+        setProperty("kylin.log.spark-executor-properties-file", getLogSparkExecutorPropertiesFile());
     }
 
     private Map<Integer, String> convertKeyToInteger(Map<String, String> map) {
@@ -2829,6 +2830,11 @@ public abstract class KylinConfigBase implements Serializable {
         return getLogPropertyFile("spark-driver-log4j.properties");
     }
 
+    public String getLogSparkExecutorPropertiesFile() {
+        return getLogPropertyFile("spark-executor-log4j.properties");
+    }
+
+
     private String getLogPropertyFile(String filename) {
         if (isDevEnv()) {
             return Paths.get(getKylinHomeWithoutWarn(), "build", "conf").toString() + File.separator + filename;
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 3d8268f..8fba3bc 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -259,6 +259,7 @@ kylin.engine.spark-conf.spark.eventLog.enabled=true
 kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
 kylin.engine.spark-conf.spark.hadoop.yarn.timeline-service.enabled=false
+kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=job -Dkylin.spark.project=${job.project} -Dkylin.spark.identifier=${job.id} -Dkylin.spark.jobName=${job.stepId} -Duser.timezone=${user.timezone}
 #kylin.engine.spark-conf.spark.sql.shuffle.partitions=1
 
 # manually upload spark-assembly jar to HDFS and then set this property will avoid repeatedly uploading jar at runtime
@@ -286,6 +287,7 @@ kylin.query.spark-conf.spark.serializer=org.apache.spark.serializer.JavaSerializ
 #kylin.query.spark-conf.spark.sql.shuffle.partitions=40
 #kylin.query.spark-conf.spark.yarn.jars=hdfs://localhost:9000/spark2_jars/*
 
+kylin.storage.columnar.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current -Dlog4j.configuration=spark-executor-log4j.properties -Dlog4j.debug -Dkylin.hdfs.working.dir=${kylin.env.hdfs-working-dir} -Dkylin.metadata.identifier=${kylin.metadata.url.identifier} -Dkylin.spark.category=sparder -Dkylin.spark.project=${job.project} -XX:MaxDirectMemorySize=896M
 # uncomment for HDP
 #kylin.query.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
 #kylin.query.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index d815e5b..a2fe7e5 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -596,7 +596,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
 
     public final String getProject() {
         if (project == null) {
-            logger.error("project is not set for abstract executable " + getId());
+            throw new IllegalStateException("project is not set for abstract executable " + getId());
         }
         return project;
     }
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index c9ae8a4..590276d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -382,7 +382,6 @@ public class ExecutableManager {
 
     /**
      * get the last N_LINES lines from the end of hdfs file input stream;
-     * reference: https://olapio.atlassian.net/wiki/spaces/PD/pages/1306918958
      *
      * @param hdfsDin
      * @param startPos
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
new file mode 100644
index 0000000..4a76022
--- /dev/null
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -0,0 +1,243 @@
+package org.apache.kylin.engine.spark.common.logging;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.log4j.helpers.LogLog;
+import org.apache.log4j.spi.LoggingEvent;
+import org.apache.spark.SparkEnv;
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+import java.util.UUID;
+
+public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
+
+    private static final long A_DAY_MILLIS = 24 * 60 * 60 * 1000L;
+    private static final long A_HOUR_MILLIS = 60 * 60 * 1000L;
+    private SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd", Locale.getDefault());
+    private SimpleDateFormat hourFormat = new SimpleDateFormat("HH");
+
+    @VisibleForTesting
+    String outPutPath;
+    @VisibleForTesting
+    String executorId;
+
+    @VisibleForTesting
+    long startTime = 0;
+    @VisibleForTesting
+    boolean rollingByHour = false;
+    @VisibleForTesting
+    int rollingPeriod = 5;
+
+    //log appender configurable
+    private String metadataIdentifier;
+    private String category;
+
+    private String identifier;
+
+    // only cubing job
+    private String jobName;
+    private String project;
+
+    public String getProject() {
+        return project;
+    }
+
+    public void setProject(String project) {
+        this.project = project;
+    }
+
+    public String getJobName() {
+        return jobName;
+    }
+
+    public void setJobName(String jobName) {
+        this.jobName = jobName;
+    }
+
+    public void setIdentifier(String identifier) {
+        this.identifier = identifier;
+    }
+
+    public String getIdentifier() {
+        return identifier;
+    }
+
+    public void setCategory(String category) {
+        this.category = category;
+    }
+
+    public String getCategory() {
+        return category;
+    }
+
+    public void setMetadataIdentifier(String metadataIdentifier) {
+        this.metadataIdentifier = metadataIdentifier;
+    }
+
+    public String getMetadataIdentifier() {
+        return metadataIdentifier;
+    }
+
+    @Override
+    void init() {
+        if (StringUtils.isBlank(this.identifier)) {
+            this.identifier = YarnSparkHadoopUtil.getContainerId().getApplicationAttemptId().getApplicationId()
+                    .toString();
+        }
+
+        LogLog.warn("metadataIdentifier -> " + getMetadataIdentifier());
+        LogLog.warn("category -> " + getCategory());
+        LogLog.warn("identifier -> " + getIdentifier());
+
+        if (null != getProject()) {
+            LogLog.warn("project -> " + getProject());
+        }
+
+        if (null != getJobName()) {
+            LogLog.warn("jobName -> " + getJobName());
+        }
+    }
+
+    @Override
+    String getAppenderName() {
+        return "SparkExecutorHdfsLogAppender";
+    }
+
+    @Override
+    boolean isSkipCheckAndFlushLog() {
+        if (SparkEnv.get() == null && StringUtils.isBlank(executorId)) {
+            LogLog.warn("Waiting for spark executor to start");
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                LogLog.error("Waiting for spark executor starting is interrupted!", e);
+                Thread.currentThread().interrupt();
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    void doWriteLog(int size, List<LoggingEvent> transaction) throws IOException, InterruptedException {
+        while (size > 0) {
+            final LoggingEvent loggingEvent = getLogBufferQue().take();
+            if (isTimeChanged(loggingEvent)) {
+                updateOutPutDir(loggingEvent);
+
+                final Path file = new Path(outPutPath);
+
+                String sparkuser = System.getenv("SPARK_USER");
+                String user = System.getenv("USER");
+                LogLog.warn("login user is " + UserGroupInformation.getLoginUser() + " SPARK_USER is " + sparkuser
+                        + " USER is " + user);
+                if (!initHdfsWriter(file, new Configuration())) {
+                    LogLog.error("Failed to init the hdfs writer!");
+                }
+                doRollingClean(loggingEvent);
+            }
+
+            transaction.add(loggingEvent);
+            writeLogEvent(loggingEvent);
+            size--;
+        }
+    }
+
+    @VisibleForTesting
+    void updateOutPutDir(LoggingEvent event) {
+        if (rollingByHour) {
+            String rollingDir = dateFormat.format(new Date(event.getTimeStamp())) + "/"
+                    + hourFormat.format(new Date(event.getTimeStamp()));
+            outPutPath = getOutPutDir(rollingDir);
+        } else {
+            String rollingDir = dateFormat.format(new Date(event.getTimeStamp()));
+            outPutPath = getOutPutDir(rollingDir);
+        }
+    }
+
+    private String getOutPutDir(String rollingDir) {
+        if (StringUtils.isBlank(executorId)) {
+            executorId = SparkEnv.get() != null ? SparkEnv.get().executorId() : UUID.randomUUID().toString();
+            LogLog.warn("executorId set to " + executorId);
+        }
+
+        if ("job".equals(getCategory())) {
+            return getRootPathName() + "/" + rollingDir + "/" + getIdentifier() + "/" + getJobName() + "/" + "executor-"
+                    + executorId + ".log";
+        }
+        return getRootPathName() + "/" + rollingDir + "/" + getIdentifier() + "/" + "executor-" + executorId + ".log";
+    }
+
+    @VisibleForTesting
+    void doRollingClean(LoggingEvent event) throws IOException {
+        FileSystem fileSystem = getFileSystem();
+
+        String rootPathName = getRootPathName();
+        Path rootPath = new Path(rootPathName);
+
+        if (!fileSystem.exists(rootPath))
+            return;
+
+        FileStatus[] logFolders = fileSystem.listStatus(rootPath);
+
+        if (logFolders == null)
+            return;
+
+        String thresholdDay = dateFormat.format(new Date(event.getTimeStamp() - A_DAY_MILLIS * rollingPeriod));
+
+        for (FileStatus fs : logFolders) {
+            String fileName = fs.getPath().getName();
+            if (fileName.compareTo(thresholdDay) < 0) {
+                Path fullPath = new Path(rootPathName + File.separator + fileName);
+                if (!fileSystem.exists(fullPath))
+                    continue;
+                fileSystem.delete(fullPath, true);
+            }
+        }
+    }
+
+    @VisibleForTesting
+    String getRootPathName() {
+        if ("job".equals(getCategory())) {
+            return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs";
+        } else if ("sparder".equals(getCategory())) {
+            return parseHdfsWordingDir() + "/_sparder_logs";
+        } else {
+            throw new IllegalArgumentException("illegal category: " + getCategory());
+        }
+    }
+
+    @VisibleForTesting
+    boolean isTimeChanged(LoggingEvent event) {
+        if (rollingByHour) {
+            return isNeedRolling(event, A_HOUR_MILLIS);
+        } else {
+            return isNeedRolling(event, A_DAY_MILLIS);
+        }
+    }
+
+    private boolean isNeedRolling(LoggingEvent event, Long timeInterval) {
+        if (0 == startTime || ((event.getTimeStamp() / timeInterval) - (startTime / timeInterval)) > 0) {
+            startTime = event.getTimeStamp();
+            return true;
+        }
+        return false;
+    }
+
+    private String parseHdfsWordingDir() {
+        return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/")
+                + StringUtils.replace(getMetadataIdentifier(), "/", "-");
+    }
+}
+