You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/03/15 13:14:50 UTC

[incubator-dolphinscheduler] branch dev updated: Cache HadoopUtils instance with specific days expire time (#2181)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new e0fc317  Cache HadoopUtils instance with specific days expire time (#2181)
e0fc317 is described below

commit e0fc3171da3b731433a4e8da3403070323007cce
Author: tswstarplanet <ts...@apache.org>
AuthorDate: Sun Mar 15 21:14:43 2020 +0800

    Cache HadoopUtils instance with specific days expire time (#2181)
    
    * Cache HadoopUtils instance with 7 days expire time
    
    * solve sonar issue
    
    * add kerberos expire time config
    
    * move KERBEROS_EXPIRE_TIME to Constants.java
---
 .../apache/dolphinscheduler/common/Constants.java  |   5 +
 .../dolphinscheduler/common/utils/HadoopUtils.java | 234 +++++++++++----------
 .../src/main/resources/common.properties           |   2 +-
 3 files changed, 124 insertions(+), 117 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 6af0e64..b0a7b74 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -814,6 +814,11 @@ public final class Constants {
     public static final String KERBEROS = "kerberos";
 
     /**
+     * kerberos expire time
+     */
+    public static final String KERBEROS_EXPIRE_TIME = "kerberos.expire.time";
+
+    /**
      * java.security.krb5.conf
      */
     public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
index 6cb58a4..e767911 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
@@ -16,6 +16,9 @@
  */
 package org.apache.dolphinscheduler.common.utils;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.ResUploadType;
@@ -37,6 +40,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -48,30 +52,37 @@ public class HadoopUtils implements Closeable {
 
     private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);
 
-    private static HadoopUtils instance = new HadoopUtils();
+    private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
+
+    private static final LoadingCache<String, HadoopUtils> cache = CacheBuilder
+            .newBuilder()
+            .expireAfterWrite(PropertyUtils.getInt(Constants.KERBEROS_EXPIRE_TIME, 7), TimeUnit.DAYS)
+            .build(new CacheLoader<String, HadoopUtils>() {
+                @Override
+                public HadoopUtils load(String key) throws Exception {
+                    return new HadoopUtils();
+                }
+            });
+
     private static Configuration configuration;
     private static FileSystem fs;
 
-    private String hdfsUser;
+    private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
 
-    private HadoopUtils(){
-        hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
+    private HadoopUtils() {
         init();
         initHdfsPath();
     }
 
-    public static HadoopUtils getInstance(){
-        // if kerberos startup , renew HadoopUtils
-        if (CommonUtils.getKerberosStartupState()){
-            return new HadoopUtils();
-        }
-        return instance;
+    public static HadoopUtils getInstance() {
+
+        return cache.getUnchecked(HADOOP_UTILS_KEY);
     }
 
     /**
      * init dolphinscheduler root path in hdfs
      */
-    private void initHdfsPath(){
+    private void initHdfsPath() {
         String hdfsPath = PropertyUtils.getString(Constants.DATA_STORE_2_HDFS_BASEPATH);
         Path path = new Path(hdfsPath);
 
@@ -80,7 +91,7 @@ public class HadoopUtils implements Closeable {
                 fs.mkdirs(path);
             }
         } catch (Exception e) {
-            logger.error(e.getMessage(),e);
+            logger.error(e.getMessage(), e);
         }
     }
 
@@ -88,82 +99,75 @@ public class HadoopUtils implements Closeable {
     /**
      * init hadoop configuration
      */
-    private void init() {
-        if (configuration == null) {
-            synchronized (HadoopUtils.class) {
-                if (configuration == null) {
-                    try {
-                        configuration = new Configuration();
-
-                        String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE);
-                        ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
-
-                        if (resUploadType == ResUploadType.HDFS){
-                            if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE)){
-                                System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
-                                        PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
-                                configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
-                                UserGroupInformation.setConfiguration(configuration);
-                                UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
-                                        PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
-                            }
+    private static void init() {
+        try {
+            configuration = new Configuration();
+
+            String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE);
+            ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
+
+            if (resUploadType == ResUploadType.HDFS) {
+                if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE)) {
+                    System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
+                            PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
+                    configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+                    UserGroupInformation.setConfiguration(configuration);
+                    UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
+                            PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
+                }
 
-                            String defaultFS = configuration.get(Constants.FS_DEFAULTFS);
-                            //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
-                            // the default is the local file system
-                            if(defaultFS.startsWith("file")){
-                                String defaultFSProp = PropertyUtils.getString(Constants.FS_DEFAULTFS);
-                                if(StringUtils.isNotBlank(defaultFSProp)){
-                                    Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
-                                    configuration.set(Constants.FS_DEFAULTFS,defaultFSProp);
-                                    fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
-                                }else{
-                                    logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS );
-                                    throw new RuntimeException(
-                                        String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
-                                        );
-                                }
-                            }else{
-                                logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
-                            }
+                String defaultFS = configuration.get(Constants.FS_DEFAULTFS);
+                //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
+                // the default is the local file system
+                if (defaultFS.startsWith("file")) {
+                    String defaultFSProp = PropertyUtils.getString(Constants.FS_DEFAULTFS);
+                    if (StringUtils.isNotBlank(defaultFSProp)) {
+                        Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
+                        configuration.set(Constants.FS_DEFAULTFS, defaultFSProp);
+                        fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
+                    } else {
+                        logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS);
+                        throw new RuntimeException(
+                                String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
+                        );
+                    }
+                } else {
+                    logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
+                }
 
-                            if (fs == null) {
-                                if(StringUtils.isNotEmpty(hdfsUser)){
-                                    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
-                                    ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
-                                        @Override
-                                        public Boolean run() throws Exception {
-                                            fs = FileSystem.get(configuration);
-                                            return true;
-                                        }
-                                    });
-                                }else{
-                                    logger.warn("hdfs.root.user is not set value!");
-                                    fs = FileSystem.get(configuration);
-                                }
+                if (fs == null) {
+                    if (StringUtils.isNotEmpty(hdfsUser)) {
+                        UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
+                        ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+                            @Override
+                            public Boolean run() throws Exception {
+                                fs = FileSystem.get(configuration);
+                                return true;
                             }
-                        }else if (resUploadType == ResUploadType.S3){
-                            configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS));
-                            configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT));
-                            configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY));
-                            configuration.set(Constants.FS_S3A_SECRET_KEY, PropertyUtils.getString(Constants.FS_S3A_SECRET_KEY));
-                            fs = FileSystem.get(configuration);
-                        }
-
-
-                        String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
-                        String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
-                        if (!StringUtils.isEmpty(rmHaIds)) {
-                            appAddress = getAppAddress(appAddress, rmHaIds);
-                            logger.info("appAddress : {}", appAddress);
-                        }
-                        configuration.set(Constants.YARN_APPLICATION_STATUS_ADDRESS, appAddress);
-                    } catch (Exception e) {
-                        logger.error(e.getMessage(), e);
+                        });
+                    } else {
+                        logger.warn("hdfs.root.user is not set value!");
+                        fs = FileSystem.get(configuration);
                     }
-
                 }
+            } else if (resUploadType == ResUploadType.S3) {
+                configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS));
+                configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT));
+                configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY));
+                configuration.set(Constants.FS_S3A_SECRET_KEY, PropertyUtils.getString(Constants.FS_S3A_SECRET_KEY));
+                fs = FileSystem.get(configuration);
+            }
+
+
+            String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
+            String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
+            if (!StringUtils.isEmpty(rmHaIds)) {
+                appAddress = getAppAddress(appAddress, rmHaIds);
+                logger.info("appAddress : {}", appAddress);
             }
+            configuration.set(Constants.YARN_APPLICATION_STATUS_ADDRESS, appAddress);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
         }
     }
 
@@ -187,14 +191,14 @@ public class HadoopUtils implements Closeable {
     /**
      * cat file on hdfs
      *
-     * @param hdfsFilePath  hdfs file path
+     * @param hdfsFilePath hdfs file path
      * @return byte[] byte array
      * @throws IOException errors
      */
     public byte[] catFile(String hdfsFilePath) throws IOException {
 
-        if(StringUtils.isBlank(hdfsFilePath)){
-            logger.error("hdfs file path:{} is blank",hdfsFilePath);
+        if (StringUtils.isBlank(hdfsFilePath)) {
+            logger.error("hdfs file path:{} is blank", hdfsFilePath);
             return new byte[0];
         }
 
@@ -203,29 +207,28 @@ public class HadoopUtils implements Closeable {
     }
 
 
-
     /**
      * cat file on hdfs
      *
-     * @param hdfsFilePath  hdfs file path
-     * @param skipLineNums  skip line numbers
-     * @param limit         read how many lines
+     * @param hdfsFilePath hdfs file path
+     * @param skipLineNums skip line numbers
+     * @param limit        read how many lines
      * @return content of file
      * @throws IOException errors
      */
     public List<String> catFile(String hdfsFilePath, int skipLineNums, int limit) throws IOException {
 
-        if (StringUtils.isBlank(hdfsFilePath)){
-            logger.error("hdfs file path:{} is blank",hdfsFilePath);
+        if (StringUtils.isBlank(hdfsFilePath)) {
+            logger.error("hdfs file path:{} is blank", hdfsFilePath);
             return Collections.emptyList();
         }
 
-        try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))){
+        try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))) {
             BufferedReader br = new BufferedReader(new InputStreamReader(in));
             Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
             return stream.collect(Collectors.toList());
         }
-        
+
     }
 
     /**
@@ -258,17 +261,17 @@ public class HadoopUtils implements Closeable {
     /**
      * the src file is on the local disk.  Add it to FS at
      * the given dst name.
-
-     * @param srcFile       local file
-     * @param dstHdfsPath   destination hdfs path
-     * @param deleteSource  whether to delete the src
-     * @param overwrite     whether to overwrite an existing file
+     *
+     * @param srcFile      local file
+     * @param dstHdfsPath  destination hdfs path
+     * @param deleteSource whether to delete the src
+     * @param overwrite    whether to overwrite an existing file
      * @return if success or not
      * @throws IOException errors
      */
     public boolean copyLocalToHdfs(String srcFile, String dstHdfsPath, boolean deleteSource, boolean overwrite) throws IOException {
         Path srcPath = new Path(srcFile);
-        Path dstPath= new Path(dstHdfsPath);
+        Path dstPath = new Path(dstHdfsPath);
 
         fs.copyFromLocalFile(deleteSource, overwrite, srcPath, dstPath);
 
@@ -278,10 +281,10 @@ public class HadoopUtils implements Closeable {
     /**
      * copy hdfs file to local
      *
-     * @param srcHdfsFilePath   source hdfs file path
-     * @param dstFile           destination file
-     * @param deleteSource      delete source
-     * @param overwrite         overwrite
+     * @param srcHdfsFilePath source hdfs file path
+     * @param dstFile         destination file
+     * @param deleteSource    delete source
+     * @param overwrite       overwrite
      * @return result of copy hdfs file to local
      * @throws IOException errors
      */
@@ -299,7 +302,7 @@ public class HadoopUtils implements Closeable {
             }
         }
 
-        if(!dstPath.getParentFile().exists()){
+        if (!dstPath.getParentFile().exists()) {
             dstPath.getParentFile().mkdirs();
         }
 
@@ -307,14 +310,13 @@ public class HadoopUtils implements Closeable {
     }
 
     /**
-     *
      * delete a file
      *
      * @param hdfsFilePath the path to delete.
-     * @param recursive if path is a directory and set to
-     * true, the directory is deleted else throws an exception. In
-     * case of a file the recursive can be set to either true or false.
-     * @return  true if delete is successful else false.
+     * @param recursive    if path is a directory and set to
+     *                     true, the directory is deleted else throws an exception. In
+     *                     case of a file the recursive can be set to either true or false.
+     * @return true if delete is successful else false.
      * @throws IOException errors
      */
     public boolean delete(String hdfsFilePath, boolean recursive) throws IOException {
@@ -339,7 +341,7 @@ public class HadoopUtils implements Closeable {
      * @return {@link FileStatus} file status
      * @throws Exception errors
      */
-    public FileStatus[] listFileStatus(String filePath)throws Exception{
+    public FileStatus[] listFileStatus(String filePath) throws Exception {
         try {
             return fs.listStatus(new Path(filePath));
         } catch (IOException e) {
@@ -351,10 +353,11 @@ public class HadoopUtils implements Closeable {
     /**
      * Renames Path src to Path dst.  Can take place on local fs
      * or remote DFS.
+     *
      * @param src path to be renamed
      * @param dst new path after rename
-     * @throws IOException on failure
      * @return true if rename is successful
+     * @throws IOException on failure
      */
     public boolean rename(String src, String dst) throws IOException {
         return fs.rename(new Path(src), new Path(dst));
@@ -400,7 +403,6 @@ public class HadoopUtils implements Closeable {
     }
 
     /**
-     *
      * @return data hdfs path
      */
     public static String getHdfsDataBasePath() {
@@ -427,11 +429,11 @@ public class HadoopUtils implements Closeable {
      * hdfs user dir
      *
      * @param tenantCode tenant code
-     * @param userId user id
+     * @param userId     user id
      * @return hdfs resource dir
      */
-    public static String getHdfsUserDir(String tenantCode,int userId) {
-        return String.format("%s/home/%d", getHdfsTenantDir(tenantCode),userId);
+    public static String getHdfsUserDir(String tenantCode, int userId) {
+        return String.format("%s/home/%d", getHdfsTenantDir(tenantCode), userId);
     }
 
     /**
@@ -479,7 +481,7 @@ public class HadoopUtils implements Closeable {
      * getAppAddress
      *
      * @param appAddress app address
-     * @param rmHa resource manager ha
+     * @param rmHa       resource manager ha
      * @return app address
      */
     public static String getAppAddress(String appAddress, String rmHa) {
diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties
index 5a4aa14..8391a9e 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -91,4 +91,4 @@ yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
 # If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine
 yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s
 
-
+kerberos.expire.time=7