You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/03/15 13:14:50 UTC
[incubator-dolphinscheduler] branch dev updated: Cache HadoopUtils
instance with specific days expire time (#2181)
This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new e0fc317 Cache HadoopUtils instance with specific days expire time (#2181)
e0fc317 is described below
commit e0fc3171da3b731433a4e8da3403070323007cce
Author: tswstarplanet <ts...@apache.org>
AuthorDate: Sun Mar 15 21:14:43 2020 +0800
Cache HadoopUtils instance with specific days expire time (#2181)
* Cache HadoopUtils instance with 7 days expire time
* solve sonar issue
* add kerberos expire time config
* move KERBEROS_EXPIRE_TIME to Constants.java
---
.../apache/dolphinscheduler/common/Constants.java | 5 +
.../dolphinscheduler/common/utils/HadoopUtils.java | 234 +++++++++++----------
.../src/main/resources/common.properties | 2 +-
3 files changed, 124 insertions(+), 117 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 6af0e64..b0a7b74 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -814,6 +814,11 @@ public final class Constants {
public static final String KERBEROS = "kerberos";
/**
+ * kerberos expire time
+ */
+ public static final String KERBEROS_EXPIRE_TIME = "kerberos.expire.time";
+
+ /**
* java.security.krb5.conf
*/
public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
index 6cb58a4..e767911 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
@@ -16,6 +16,9 @@
*/
package org.apache.dolphinscheduler.common.utils;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
@@ -37,6 +40,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -48,30 +52,37 @@ public class HadoopUtils implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);
- private static HadoopUtils instance = new HadoopUtils();
+ private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
+
+ private static final LoadingCache<String, HadoopUtils> cache = CacheBuilder
+ .newBuilder()
+ .expireAfterWrite(PropertyUtils.getInt(Constants.KERBEROS_EXPIRE_TIME, 7), TimeUnit.DAYS)
+ .build(new CacheLoader<String, HadoopUtils>() {
+ @Override
+ public HadoopUtils load(String key) throws Exception {
+ return new HadoopUtils();
+ }
+ });
+
private static Configuration configuration;
private static FileSystem fs;
- private String hdfsUser;
+ private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
- private HadoopUtils(){
- hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
+ private HadoopUtils() {
init();
initHdfsPath();
}
- public static HadoopUtils getInstance(){
- // if kerberos startup , renew HadoopUtils
- if (CommonUtils.getKerberosStartupState()){
- return new HadoopUtils();
- }
- return instance;
+ public static HadoopUtils getInstance() {
+
+ return cache.getUnchecked(HADOOP_UTILS_KEY);
}
/**
* init dolphinscheduler root path in hdfs
*/
- private void initHdfsPath(){
+ private void initHdfsPath() {
String hdfsPath = PropertyUtils.getString(Constants.DATA_STORE_2_HDFS_BASEPATH);
Path path = new Path(hdfsPath);
@@ -80,7 +91,7 @@ public class HadoopUtils implements Closeable {
fs.mkdirs(path);
}
} catch (Exception e) {
- logger.error(e.getMessage(),e);
+ logger.error(e.getMessage(), e);
}
}
@@ -88,82 +99,75 @@ public class HadoopUtils implements Closeable {
/**
* init hadoop configuration
*/
- private void init() {
- if (configuration == null) {
- synchronized (HadoopUtils.class) {
- if (configuration == null) {
- try {
- configuration = new Configuration();
-
- String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE);
- ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
-
- if (resUploadType == ResUploadType.HDFS){
- if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE)){
- System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
- PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
- configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
- UserGroupInformation.setConfiguration(configuration);
- UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
- PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
- }
+ private static void init() {
+ try {
+ configuration = new Configuration();
+
+ String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE);
+ ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
+
+ if (resUploadType == ResUploadType.HDFS) {
+ if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE)) {
+ System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
+ PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
+ configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(configuration);
+ UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
+ PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_PATH));
+ }
- String defaultFS = configuration.get(Constants.FS_DEFAULTFS);
- //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
- // the default is the local file system
- if(defaultFS.startsWith("file")){
- String defaultFSProp = PropertyUtils.getString(Constants.FS_DEFAULTFS);
- if(StringUtils.isNotBlank(defaultFSProp)){
- Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
- configuration.set(Constants.FS_DEFAULTFS,defaultFSProp);
- fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
- }else{
- logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS );
- throw new RuntimeException(
- String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
- );
- }
- }else{
- logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
- }
+ String defaultFS = configuration.get(Constants.FS_DEFAULTFS);
+ //first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
+ // the default is the local file system
+ if (defaultFS.startsWith("file")) {
+ String defaultFSProp = PropertyUtils.getString(Constants.FS_DEFAULTFS);
+ if (StringUtils.isNotBlank(defaultFSProp)) {
+ Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
+ configuration.set(Constants.FS_DEFAULTFS, defaultFSProp);
+ fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
+ } else {
+ logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULTFS);
+ throw new RuntimeException(
+ String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULTFS)
+ );
+ }
+ } else {
+ logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULTFS, defaultFS);
+ }
- if (fs == null) {
- if(StringUtils.isNotEmpty(hdfsUser)){
- UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
- ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
- @Override
- public Boolean run() throws Exception {
- fs = FileSystem.get(configuration);
- return true;
- }
- });
- }else{
- logger.warn("hdfs.root.user is not set value!");
- fs = FileSystem.get(configuration);
- }
+ if (fs == null) {
+ if (StringUtils.isNotEmpty(hdfsUser)) {
+ UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
+ ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+ @Override
+ public Boolean run() throws Exception {
+ fs = FileSystem.get(configuration);
+ return true;
}
- }else if (resUploadType == ResUploadType.S3){
- configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS));
- configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT));
- configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY));
- configuration.set(Constants.FS_S3A_SECRET_KEY, PropertyUtils.getString(Constants.FS_S3A_SECRET_KEY));
- fs = FileSystem.get(configuration);
- }
-
-
- String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
- String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
- if (!StringUtils.isEmpty(rmHaIds)) {
- appAddress = getAppAddress(appAddress, rmHaIds);
- logger.info("appAddress : {}", appAddress);
- }
- configuration.set(Constants.YARN_APPLICATION_STATUS_ADDRESS, appAddress);
- } catch (Exception e) {
- logger.error(e.getMessage(), e);
+ });
+ } else {
+ logger.warn("hdfs.root.user is not set value!");
+ fs = FileSystem.get(configuration);
}
-
}
+ } else if (resUploadType == ResUploadType.S3) {
+ configuration.set(Constants.FS_DEFAULTFS, PropertyUtils.getString(Constants.FS_DEFAULTFS));
+ configuration.set(Constants.FS_S3A_ENDPOINT, PropertyUtils.getString(Constants.FS_S3A_ENDPOINT));
+ configuration.set(Constants.FS_S3A_ACCESS_KEY, PropertyUtils.getString(Constants.FS_S3A_ACCESS_KEY));
+ configuration.set(Constants.FS_S3A_SECRET_KEY, PropertyUtils.getString(Constants.FS_S3A_SECRET_KEY));
+ fs = FileSystem.get(configuration);
+ }
+
+
+ String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
+ String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
+ if (!StringUtils.isEmpty(rmHaIds)) {
+ appAddress = getAppAddress(appAddress, rmHaIds);
+ logger.info("appAddress : {}", appAddress);
}
+ configuration.set(Constants.YARN_APPLICATION_STATUS_ADDRESS, appAddress);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
}
}
@@ -187,14 +191,14 @@ public class HadoopUtils implements Closeable {
/**
* cat file on hdfs
*
- * @param hdfsFilePath hdfs file path
+ * @param hdfsFilePath hdfs file path
* @return byte[] byte array
* @throws IOException errors
*/
public byte[] catFile(String hdfsFilePath) throws IOException {
- if(StringUtils.isBlank(hdfsFilePath)){
- logger.error("hdfs file path:{} is blank",hdfsFilePath);
+ if (StringUtils.isBlank(hdfsFilePath)) {
+ logger.error("hdfs file path:{} is blank", hdfsFilePath);
return new byte[0];
}
@@ -203,29 +207,28 @@ public class HadoopUtils implements Closeable {
}
-
/**
* cat file on hdfs
*
- * @param hdfsFilePath hdfs file path
- * @param skipLineNums skip line numbers
- * @param limit read how many lines
+ * @param hdfsFilePath hdfs file path
+ * @param skipLineNums skip line numbers
+ * @param limit read how many lines
* @return content of file
* @throws IOException errors
*/
public List<String> catFile(String hdfsFilePath, int skipLineNums, int limit) throws IOException {
- if (StringUtils.isBlank(hdfsFilePath)){
- logger.error("hdfs file path:{} is blank",hdfsFilePath);
+ if (StringUtils.isBlank(hdfsFilePath)) {
+ logger.error("hdfs file path:{} is blank", hdfsFilePath);
return Collections.emptyList();
}
- try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))){
+ try (FSDataInputStream in = fs.open(new Path(hdfsFilePath))) {
BufferedReader br = new BufferedReader(new InputStreamReader(in));
Stream<String> stream = br.lines().skip(skipLineNums).limit(limit);
return stream.collect(Collectors.toList());
}
-
+
}
/**
@@ -258,17 +261,17 @@ public class HadoopUtils implements Closeable {
/**
* the src file is on the local disk. Add it to FS at
* the given dst name.
-
- * @param srcFile local file
- * @param dstHdfsPath destination hdfs path
- * @param deleteSource whether to delete the src
- * @param overwrite whether to overwrite an existing file
+ *
+ * @param srcFile local file
+ * @param dstHdfsPath destination hdfs path
+ * @param deleteSource whether to delete the src
+ * @param overwrite whether to overwrite an existing file
* @return if success or not
* @throws IOException errors
*/
public boolean copyLocalToHdfs(String srcFile, String dstHdfsPath, boolean deleteSource, boolean overwrite) throws IOException {
Path srcPath = new Path(srcFile);
- Path dstPath= new Path(dstHdfsPath);
+ Path dstPath = new Path(dstHdfsPath);
fs.copyFromLocalFile(deleteSource, overwrite, srcPath, dstPath);
@@ -278,10 +281,10 @@ public class HadoopUtils implements Closeable {
/**
* copy hdfs file to local
*
- * @param srcHdfsFilePath source hdfs file path
- * @param dstFile destination file
- * @param deleteSource delete source
- * @param overwrite overwrite
+ * @param srcHdfsFilePath source hdfs file path
+ * @param dstFile destination file
+ * @param deleteSource delete source
+ * @param overwrite overwrite
* @return result of copy hdfs file to local
* @throws IOException errors
*/
@@ -299,7 +302,7 @@ public class HadoopUtils implements Closeable {
}
}
- if(!dstPath.getParentFile().exists()){
+ if (!dstPath.getParentFile().exists()) {
dstPath.getParentFile().mkdirs();
}
@@ -307,14 +310,13 @@ public class HadoopUtils implements Closeable {
}
/**
- *
* delete a file
*
* @param hdfsFilePath the path to delete.
- * @param recursive if path is a directory and set to
- * true, the directory is deleted else throws an exception. In
- * case of a file the recursive can be set to either true or false.
- * @return true if delete is successful else false.
+ * @param recursive if path is a directory and set to
+ * true, the directory is deleted else throws an exception. In
+ * case of a file the recursive can be set to either true or false.
+ * @return true if delete is successful else false.
* @throws IOException errors
*/
public boolean delete(String hdfsFilePath, boolean recursive) throws IOException {
@@ -339,7 +341,7 @@ public class HadoopUtils implements Closeable {
* @return {@link FileStatus} file status
* @throws Exception errors
*/
- public FileStatus[] listFileStatus(String filePath)throws Exception{
+ public FileStatus[] listFileStatus(String filePath) throws Exception {
try {
return fs.listStatus(new Path(filePath));
} catch (IOException e) {
@@ -351,10 +353,11 @@ public class HadoopUtils implements Closeable {
/**
* Renames Path src to Path dst. Can take place on local fs
* or remote DFS.
+ *
* @param src path to be renamed
* @param dst new path after rename
- * @throws IOException on failure
* @return true if rename is successful
+ * @throws IOException on failure
*/
public boolean rename(String src, String dst) throws IOException {
return fs.rename(new Path(src), new Path(dst));
@@ -400,7 +403,6 @@ public class HadoopUtils implements Closeable {
}
/**
- *
* @return data hdfs path
*/
public static String getHdfsDataBasePath() {
@@ -427,11 +429,11 @@ public class HadoopUtils implements Closeable {
* hdfs user dir
*
* @param tenantCode tenant code
- * @param userId user id
+ * @param userId user id
* @return hdfs resource dir
*/
- public static String getHdfsUserDir(String tenantCode,int userId) {
- return String.format("%s/home/%d", getHdfsTenantDir(tenantCode),userId);
+ public static String getHdfsUserDir(String tenantCode, int userId) {
+ return String.format("%s/home/%d", getHdfsTenantDir(tenantCode), userId);
}
/**
@@ -479,7 +481,7 @@ public class HadoopUtils implements Closeable {
* getAppAddress
*
* @param appAddress app address
- * @param rmHa resource manager ha
+ * @param rmHa resource manager ha
* @return app address
*/
public static String getAppAddress(String appAddress, String rmHa) {
diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties
index 5a4aa14..8391a9e 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -91,4 +91,4 @@ yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine
yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s
-
+kerberos.expire.time=7