You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/03/01 06:44:00 UTC

[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4917 Fix some problem of logger system in kylin4

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
     new 88ad6ec  KYLIN-4917 Fix some problem of logger system in kylin4
88ad6ec is described below

commit 88ad6ec31b1d5ba0308bebd96d9c65d93935d3d8
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Wed Feb 24 17:25:30 2021 +0800

    KYLIN-4917 Fix some problem of logger system in kylin4
---
 .../common/logging/AbstractHdfsLogAppender.java    | 10 ++--
 .../common/logging/SparkExecutorHdfsAppender.java  | 54 ++++++++++++++++++----
 .../apache/kylin/rest/job/StorageCleanupJob.java   |  2 +-
 3 files changed, 53 insertions(+), 13 deletions(-)

diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
index ff5240c..17422a8 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/AbstractHdfsLogAppender.java
@@ -336,7 +336,9 @@ public abstract class AbstractHdfsLogAppender extends AppenderSkeleton {
      * @throws IOException
      */
     protected void write(String message) throws IOException {
-        bufferedWriter.write(message);
+        if (isWriterInited()) {
+            bufferedWriter.write(message);
+        }
     }
 
     /**
@@ -373,8 +375,10 @@ public abstract class AbstractHdfsLogAppender extends AppenderSkeleton {
      * @throws IOException
      */
     private void flush() throws IOException {
-        bufferedWriter.flush();
-        outStream.hsync();
+        if (isWriterInited()) {
+            bufferedWriter.flush();
+            outStream.hsync();
+        }
     }
 
     /**
diff --git a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
index b2e4146..0bbd3e7 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
+++ b/kylin-spark-project/kylin-spark-common/src/main/java/org/apache/kylin/engine/spark/common/logging/SparkExecutorHdfsAppender.java
@@ -25,10 +25,13 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.log4j.helpers.LogLog;
 import org.apache.log4j.spi.LoggingEvent;
 import org.apache.spark.SparkEnv;
+import org.apache.spark.deploy.SparkHadoopUtil;
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil;
+import scala.runtime.BoxedUnit;
 
 import java.io.File;
 import java.io.IOException;
@@ -160,12 +163,21 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
                 String user = System.getenv("USER");
                 LogLog.warn("login user is " + UserGroupInformation.getLoginUser() + " SPARK_USER is " + sparkuser
                         + " USER is " + user);
-                if (!initHdfsWriter(file, new Configuration())) {
-                    LogLog.error("Failed to init the hdfs writer!");
-                }
-                doRollingClean(loggingEvent);
+                SparkHadoopUtil.get().runAsSparkUser(new scala.runtime.AbstractFunction0<scala.runtime.BoxedUnit>() {
+                    @Override
+                    public BoxedUnit apply() {
+                        if (!initHdfsWriter(file, new Configuration())) {
+                            LogLog.error("Failed to init the hdfs writer!");
+                        }
+                        try {
+                            doRollingClean(loggingEvent);
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                        return null;
+                    }
+                });
             }
-
             transaction.add(loggingEvent);
             writeLogEvent(loggingEvent);
             size--;
@@ -229,9 +241,9 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
     @VisibleForTesting
     String getRootPathName() {
         if ("job".equals(getCategory())) {
-            return parseHdfsWordingDir() + "/" + getProject() + "/spark_logs/executor/";
+            return getHdfsWorkingDir() + "/" + getProject() + "/spark_logs/executor/";
         } else if ("sparder".equals(getCategory())) {
-            return parseHdfsWordingDir() + "/_sparder_logs";
+            return parseHdfsWorkingDir() + "/_sparder_logs";
         } else {
             throw new IllegalArgumentException("illegal category: " + getCategory());
         }
@@ -254,8 +266,32 @@ public class SparkExecutorHdfsAppender extends AbstractHdfsLogAppender {
         return false;
     }
 
-    private String parseHdfsWordingDir() {
-        return StringUtils.appendIfMissing(getHdfsWorkingDir(), "/");
+    private String parseHdfsWorkingDir() {
+        String root = getHdfsWorkingDir();
+        Path path = new Path(root);
+        if (!path.isAbsolute())
+            throw new IllegalArgumentException("kylin.env.hdfs-working-dir must be absolute, but got " + root);
+
+        try {
+            FileSystem fs = path.getFileSystem(HadoopUtil.getCurrentConfiguration());
+            path = fs.makeQualified(path);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        // append metadata-url prefix
+        String metaId = getMetadataIdentifier().replace(':', '-');
+        //transform relative path for local metadata
+        if (metaId.startsWith("../")) {
+            metaId = metaId.replace("../", "");
+            metaId = metaId.replace('/', '-');
+        }
+
+        root = new Path(path, metaId).toString();
+
+        if (!root.endsWith("/"))
+            root += "/";
+        return root;
     }
 }
 
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
index 2b7569e..f4ad269 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/StorageCleanupJob.java
@@ -82,7 +82,7 @@ public class StorageCleanupJob extends AbstractApplication {
 
     protected long storageTimeCut;
 
-    protected static final List<String> protectedDir = Arrays.asList("cube_statistics", "resources-jdbc");
+    protected static final List<String> protectedDir = Arrays.asList("cube_statistics", "resources-jdbc", "_sparder_logs");
     protected static PathFilter pathFilter = status -> !protectedDir.contains(status.getName());
 
     public StorageCleanupJob() throws IOException {