You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/01 06:44:00 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4917 Fix some
problem of logger system in kylin4
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new 88ad6ec KYLIN-4917 Fix some problem of logger system in kylin4
88ad6ec is described below
commit 88ad6ec31b1d5ba0308bebd96d9c65d93935d3d8
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Wed Feb 24 17:25:30 2021 +0800
KYLIN-4917 Fix some problem of logger system in kylin4
---
.../common/logging/AbstractHdfsLogAppender.java | 10 ++--
.../common/logging/SparkExecutorHdfsAppender.java | 54 ++++++++++++++++++----
.../apache/kylin/rest/job/StorageCleanupJob.java | 2 +-
3 files changed, 53 insertions(+), 13 deletions(-)
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
index ff5240c..17422a8 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
@@ -336,7 +336,9 @@ public abstract class AbstractHdfsLogAppender extends AppenderSkeleton {
* @throws IOException
*/
protected void write(String message) throws IOException {
- bufferedWriter.write(message);
+ if (isWriterInited()) {
+ bufferedWriter.write(message);
+ }
}
/**
@@ -373,8 +375,10 @@ public abstract class AbstractHdfsLogAppender extends AppenderSkeleton {
* @throws IOException
*/
private void flush() throws IOException {
- bufferedWriter.flush();
- outStream.hsync();
+ if (isWriterInited()) {
+ bufferedWriter.flush();
+ outStream.hsync();
+ }
}
/**
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
index b2e4146..0bbd3e7 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -25,10 +25,13 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.log4j.helpers.LogLog;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.spark.SparkEnv;
+import org.apache.spark.deploy.SparkHadoopUtil;
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil;
+import scala.runtime.BoxedUnit;
import java.io.File;
import java.io.IOException;
@@ -160,12 +163,21 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
String user = System.getenv("USER");
LogLog.warn("login user is " + UserGroupInformation.getLoginUser() + " SPARK_USER is " + sparkuser
+ " USER is " + user);
- if (!initHdfsWriter(file, new Configuration())) {
- LogLog.error("Failed to init the hdfs writer!");
- }
- doRollingClean(loggingEvent);
+ SparkHadoopUtil.get().runAsSparkUser(new scala.runtime.AbstractFunction0<scala.runtime.BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ if (!initHdfsWriter(file, new Configuration())) {
+ LogLog.error("Failed to init the hdfs writer!");
+ }
+ try {
+ doRollingClean(loggingEvent);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+ });
}
-
transaction.add(loggingEvent);
writeLogEvent(loggingEvent);
size--;
@@ -229,9 +241,9 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
@VisibleForTesting
String getRootPathName() {
if ("job".equals(getCategory())) {
- return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs/executor/";
+ return getHdfsWorkingDir() + "/" + getProject() + "/spark_logs/executor/";
} else if ("sparder".equals(getCategory())) {
- return parseHdfsWordingDir() + "/_sparder_logs";
+ return parseHdfsWorkingDir() + "/_sparder_logs";
} else {
throw new IllegalArgumentException("illegal category: " + getCategory());
}
@@ -254,8 +266,32 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
return false;
}
- private String parseHdfsWordingDir() {
- return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/");
+ private String parseHdfsWorkingDir() {
+ String root = getHdfsWorkingDir();
+ Path path = new Path(root);
+ if (!path.isAbsolute())
+ throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + root);
+
+ try {
+ FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration());
+ path = fs.makeQualified(path);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ // append metadata-url prefix
+ String metaId = getMetadataIdentifier().replace(':', '-');
+ //transform relative path for local metadata
+ if (metaId.startsWith("../")) {
+ metaId = metaId.replace("../", "");
+ metaId = metaId.replace('/', '-');
+ }
+
+ root = new Path(path, metaId).toString();
+
+ if (!root.endsWith("/"))
+ root += "/";
+ return root;
}
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index 2b7569e..f4ad269 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -82,7 +82,7 @@ public class StorageCleanupJob extends AbstractApplication {
protected long storageTimeCut;
- protected static final List<String> protectedDir = Arrays.asList("cube_statistics", "resources-jdbc");
+ protected static final List<String> protectedDir = Arrays.asList("cube_statistics", "resources-jdbc", "_sparder_logs");
protected static PathFilter pathFilter = status -> !protectedDir.contains(status.getName());
public StorageCleanupJob() throws IOException {