You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by jo...@apache.org on 2020/03/25 07:16:47 UTC
[incubator-dolphinscheduler] branch refactor-worker updated: [refactor-worker] simplify master、 worker、alert、dao properties (#2304)
This is an automated email from the ASF dual-hosted git repository.
journey pushed a commit to branch refactor-worker
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/refactor-worker by this push:
new ace907e [refactor-worker] simplify master、 worker、alert、dao properties (#2304)
ace907e is described below
commit ace907e7ddf687c1f3d4de3f02804d9493358b9a
Author: dailidong <da...@gmail.com>
AuthorDate: Wed Mar 25 15:16:42 2020 +0800
[refactor-worker] simplify master、 worker、alert、dao properties (#2304)
* update logback
* update log
* refactor worker registry (#2107)
* Refactor worker (#2115)
* refactor worker registry
* refactor master server
* Modify workgroupid parameter name (#2105)
* Delete worker group management page
* Modify workgroupid parameter name
* Refactor worker (#2121)
* refactor worker registry
* refactor master server
* refactor MasterSchedulerService
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath (#2126)
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* 1, master persistent task
2. extract master and worker communication model
* add license
* modify javadoc error
* TaskExecutionContext create modify
* buildAckCommand taskInstanceId not set modify
* java doc error modify
* add comment
* ExecutorManager interface add generic type
* add TaskInstanceCacheManager receive Worker report result
* TaskInstance setExecutePath
* add TaskInstanceCacheManager to receive Worker Task result report
* TaskInstanceCacheManager add remove method
* add license
* add dispatcht task method
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* AbstractCommandExecutor remove db access
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* taskInstanceCache is null ,need load from db
* 1,worker TaskPros use TaskExecutionContext replase
2,Master kill Task , KillTaskProcessor modify
* worker remove db
* ShellTask modify
* master persistence processId and appIds
* master persistence processId and appIds
* master add kill task logic
* master add kill task logic
* master add kill task logic
* javadoc error modify
* remove chinese log
* executeDirectly method add Override
* remote module modify
* TaskKillResponseProcessor command type modify
* create buildKillCommand
* host add host:port format
* host add host:port format
* TaskAckProcessor modify
* TaskAckProcessor modify
* task prioriry refator
* remove ITaskQueue
* task prioriry refator
* remove ITaskQueue
* TaskPriority refactor
* remove logs
* WorkerServer refactor
* MasterSchedulerService modify
* WorkerConfig listen port modify
* modify master and worker listen port
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
* cancelTaskInstance set TaskExecutionContext host,logPath,executePath
Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
* not exist in openjdk,just delete
* add master and worker properties
* add master and worker properties
* add master and worker properties
* fix cpu 100% bug
* simplify master、 worker、alert、dao properties
* add master and worker properties
* add master and worker properties
* add master and worker properties
Co-authored-by: Tboy <gu...@immomo.com>
Co-authored-by: break60 <79...@qq.com>
Co-authored-by: qiaozhanwei <qi...@outlook.com>
Co-authored-by: qiaozhanwei <qi...@analysys.com.cn>
---
.../dolphinscheduler/alert/utils/MailUtils.java | 2 +-
.../alert/utils/PropertyUtils.java | 12 ++++
.../src/main/resources/alert.properties | 18 ++---
.../apache/dolphinscheduler/common/Constants.java | 76 +++++++++++++++------
.../dolphinscheduler/common/utils/CommonUtils.java | 20 ++++--
.../dolphinscheduler/common/utils/FileUtils.java | 38 +++++++++--
.../dolphinscheduler/common/utils/HadoopUtils.java | 45 +++++++++---
.../common/utils/PropertyUtils.java | 30 +++++++-
.../src/main/resources/common.properties | 79 +++++++---------------
.../apache/dolphinscheduler/dao/TaskRecordDao.java | 26 ++-----
.../dao/datasource/ConnectionFactory.java | 27 +-------
.../dao/datasource/SpringConnectionFactory.java | 60 +++++++---------
.../dolphinscheduler/dao/utils/PropertyUtils.java | 55 ++++++++++++++-
...pplication.properties => datasource.properties} | 54 ++++++++-------
.../master/consumer/TaskUpdateQueueConsumer.java | 1 +
.../service/quartz/QuartzExecutors.java | 51 +++++++++++++-
.../service/zk/ZookeeperConfig.java | 2 +-
.../src/main/resources/quartz.properties | 41 +++++------
.../src/main/resources/zookeeper.properties | 32 +++++++++
19 files changed, 429 insertions(+), 240 deletions(-)
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
index 99efdc8..b0aa418 100644
--- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/MailUtils.java
@@ -55,7 +55,7 @@ public class MailUtils {
public static final Boolean mailUseSSL = PropertyUtils.getBoolean(Constants.MAIL_SMTP_SSL_ENABLE);
- public static final String xlsFilePath = PropertyUtils.getString(Constants.XLS_FILE_PATH);
+ public static final String xlsFilePath = PropertyUtils.getString(Constants.XLS_FILE_PATH,"/tmp/xls");
public static final String starttlsEnable = PropertyUtils.getString(Constants.MAIL_SMTP_STARTTLS_ENABLE);
diff --git a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java
index c2f479d..91f7261 100644
--- a/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java
+++ b/dolphinscheduler-alert/src/main/java/org/apache/dolphinscheduler/alert/utils/PropertyUtils.java
@@ -83,6 +83,18 @@ public class PropertyUtils {
* get property value
*
* @param key property name
+ * @param defaultVal default value
+ * @return property value
+ */
+ public static String getString(String key, String defaultVal) {
+ String val = properties.getProperty(key.trim());
+ return val == null ? defaultVal : val;
+ }
+
+ /**
+ * get property value
+ *
+ * @param key property name
* @return get property int value , if key == null, then return -1
*/
public static int getInt(String key) {
diff --git a/dolphinscheduler-alert/src/main/resources/alert.properties b/dolphinscheduler-alert/src/main/resources/alert.properties
index 000d065..db34452 100644
--- a/dolphinscheduler-alert/src/main/resources/alert.properties
+++ b/dolphinscheduler-alert/src/main/resources/alert.properties
@@ -36,18 +36,18 @@ mail.smtp.ssl.enable=false
mail.smtp.ssl.trust=xxx.xxx.com
#xls file path,need create if not exist
-xls.file.path=/tmp/xls
+#xls.file.path=/tmp/xls
# Enterprise WeChat configuration
enterprise.wechat.enable=false
-enterprise.wechat.corp.id=xxxxxxx
-enterprise.wechat.secret=xxxxxxx
-enterprise.wechat.agent.id=xxxxxxx
-enterprise.wechat.users=xxxxxxx
-enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret
-enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token
-enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"}
-enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}}
+#enterprise.wechat.corp.id=xxxxxxx
+#enterprise.wechat.secret=xxxxxxx
+#enterprise.wechat.agent.id=xxxxxxx
+#enterprise.wechat.users=xxxxxxx
+#enterprise.wechat.token.url=https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid=$corpId&corpsecret=$secret
+#enterprise.wechat.push.url=https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=$token
+#enterprise.wechat.team.send.msg={\"toparty\":\"$toParty\",\"agentid\":\"$agentId\",\"msgtype\":\"text\",\"text\":{\"content\":\"$msg\"},\"safe\":\"0\"}
+#enterprise.wechat.user.send.msg={\"touser\":\"$toUser\",\"agentid\":\"$agentId\",\"msgtype\":\"markdown\",\"markdown\":{\"content\":\"$msg\"}}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 4a3355a..fd95dfd 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -25,9 +25,43 @@ import java.util.regex.Pattern;
* Constants
*/
public final class Constants {
+
private Constants() {
throw new IllegalStateException("Constants class");
}
+
+ /**
+ * quartz config
+ */
+ public static final String ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS = "org.quartz.jobStore.driverDelegateClass";
+ public static final String ORG_QUARTZ_SCHEDULER_INSTANCENAME = "org.quartz.scheduler.instanceName";
+ public static final String ORG_QUARTZ_SCHEDULER_INSTANCEID = "org.quartz.scheduler.instanceId";
+ public static final String ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON = "org.quartz.scheduler.makeSchedulerThreadDaemon";
+ public static final String ORG_QUARTZ_JOBSTORE_USEPROPERTIES = "org.quartz.jobStore.useProperties";
+ public static final String ORG_QUARTZ_THREADPOOL_CLASS = "org.quartz.threadPool.class";
+ public static final String ORG_QUARTZ_THREADPOOL_THREADCOUNT = "org.quartz.threadPool.threadCount";
+ public static final String ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS = "org.quartz.threadPool.makeThreadsDaemons";
+ public static final String ORG_QUARTZ_THREADPOOL_THREADPRIORITY = "org.quartz.threadPool.threadPriority";
+ public static final String ORG_QUARTZ_JOBSTORE_CLASS = "org.quartz.jobStore.class";
+ public static final String ORG_QUARTZ_JOBSTORE_TABLEPREFIX = "org.quartz.jobStore.tablePrefix";
+ public static final String ORG_QUARTZ_JOBSTORE_ISCLUSTERED = "org.quartz.jobStore.isClustered";
+ public static final String ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD = "org.quartz.jobStore.misfireThreshold";
+ public static final String ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL = "org.quartz.jobStore.clusterCheckinInterval";
+ public static final String ORG_QUARTZ_JOBSTORE_DATASOURCE = "org.quartz.jobStore.dataSource";
+ public static final String ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS = "org.quartz.dataSource.myDs.connectionProvider.class";
+
+ /**
+ * quartz config default value
+ */
+ public static final String QUARTZ_TABLE_PREFIX = "QRTZ_";
+ public static final String QUARTZ_MISFIRETHRESHOLD = "60000";
+ public static final String QUARTZ_CLUSTERCHECKININTERVAL = "5000";
+ public static final String QUARTZ_DATASOURCE = "myDs";
+ public static final String QUARTZ_THREADCOUNT = "25";
+ public static final String QUARTZ_THREADPRIORITY = "5";
+ public static final String QUARTZ_INSTANCENAME = "DolphinScheduler";
+ public static final String QUARTZ_INSTANCEID = "AUTO";
+
/**
* common properties path
*/
@@ -56,9 +90,11 @@ public final class Constants {
/**
- * yarn.resourcemanager.ha.rm.idsfs.defaultFS
+ * yarn.resourcemanager.ha.rm.ids
*/
public static final String YARN_RESOURCEMANAGER_HA_RM_IDS = "yarn.resourcemanager.ha.rm.ids";
+ public static final String YARN_RESOURCEMANAGER_HA_XX = "xx";
+
/**
* yarn.application.status.address
@@ -72,31 +108,25 @@ public final class Constants {
public static final String HDFS_ROOT_USER = "hdfs.root.user";
/**
- * hdfs configuration
- * data.store2hdfs.basepath
+ * hdfs/s3 configuration
+ * resource.upload.path
*/
- public static final String DATA_STORE_2_HDFS_BASEPATH = "data.store2hdfs.basepath";
+ public static final String RESOURCE_UPLOAD_PATH = "resource.upload.path";
/**
- * data.basedir.path
+ * data basedir path
*/
public static final String DATA_BASEDIR_PATH = "data.basedir.path";
/**
- * data.download.basedir.path
- */
- public static final String DATA_DOWNLOAD_BASEDIR_PATH = "data.download.basedir.path";
-
- /**
- * process.exec.basepath
- */
- public static final String PROCESS_EXEC_BASEPATH = "process.exec.basepath";
-
- /**
* dolphinscheduler.env.path
*/
public static final String DOLPHINSCHEDULER_ENV_PATH = "dolphinscheduler.env.path";
+ /**
+ * environment properties default path
+ */
+ public static final String ENV_PATH = "env/dolphinscheduler_env.sh";
/**
* python home
@@ -108,15 +138,23 @@ public final class Constants {
*/
public static final String RESOURCE_VIEW_SUFFIXS = "resource.view.suffixs";
+ public static final String RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE = "txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties";
+
/**
* development.state
*/
public static final String DEVELOPMENT_STATE = "development.state";
+ public static final String DEVELOPMENT_STATE_DEFAULT_VALUE = "true";
+
+ /**
+ * string true
+ */
+ public static final String STRING_TRUE = "true";
/**
- * res.upload.startup.type
+ * resource storage type
*/
- public static final String RES_UPLOAD_STARTUP_TYPE = "res.upload.startup.type";
+ public static final String RESOURCE_STORAGE_TYPE = "resource.storage.type";
/**
* MasterServer directory registered in zookeeper
@@ -346,9 +384,9 @@ public final class Constants {
public static final String FLOWNODE_RUN_FLAG_FORBIDDEN = "FORBIDDEN";
/**
- * task record configuration path
+ * datasource configuration path
*/
- public static final String APPLICATION_PROPERTIES = "application.properties";
+ public static final String DATASOURCE_PROPERTIES = "/datasource.properties";
public static final String TASK_RECORD_URL = "task.record.datasource.url";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
index d15ede2..3032740 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/CommonUtils.java
@@ -20,13 +20,18 @@ import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
+import java.net.URL;
/**
* common utils
*/
public class CommonUtils {
+ private static final Logger logger = LoggerFactory.getLogger(CommonUtils.class);
+
private CommonUtils() {
throw new IllegalStateException("CommonUtils class");
}
@@ -37,7 +42,14 @@ public class CommonUtils {
public static String getSystemEnvPath() {
String envPath = PropertyUtils.getString(Constants.DOLPHINSCHEDULER_ENV_PATH);
if (StringUtils.isEmpty(envPath)) {
- envPath = System.getProperty("user.home") + File.separator + ".bash_profile";
+ URL envDefaultPath = CommonUtils.class.getClassLoader().getResource(Constants.ENV_PATH);
+
+ if (envDefaultPath != null){
+ envPath = envDefaultPath.getPath();
+ logger.debug("env path :{}", envPath);
+ }else{
+ envPath = System.getProperty("user.home") + File.separator + ".bash_profile";
+ }
}
return envPath;
@@ -55,7 +67,7 @@ public class CommonUtils {
* @return is develop mode
*/
public static boolean isDevelopMode() {
- return PropertyUtils.getBoolean(Constants.DEVELOPMENT_STATE);
+ return PropertyUtils.getBoolean(Constants.DEVELOPMENT_STATE, true);
}
@@ -65,9 +77,9 @@ public class CommonUtils {
* @return true if upload resource is HDFS and kerberos startup
*/
public static boolean getKerberosStartupState(){
- String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE);
+ String resUploadStartupType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
- Boolean kerberosStartupState = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE);
+ Boolean kerberosStartupState = PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false);
return resUploadType == ResUploadType.HDFS && kerberosStartupState;
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index c84848f..9ae315a 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -34,6 +34,8 @@ import static org.apache.dolphinscheduler.common.Constants.*;
public class FileUtils {
public static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
+ public static final String DATA_BASEDIR = PropertyUtils.getString(DATA_BASEDIR_PATH,"/tmp/dolphinscheduler");
+
/**
* get file suffix
*
@@ -59,7 +61,14 @@ public class FileUtils {
* @return download file name
*/
public static String getDownloadFilename(String filename) {
- return String.format("%s/%s/%s", PropertyUtils.getString(DATA_DOWNLOAD_BASEDIR_PATH), DateUtils.getCurrentTime(YYYYMMDDHHMMSS), filename);
+ String fileName = String.format("%s/download/%s/%s", DATA_BASEDIR, DateUtils.getCurrentTime(YYYYMMDDHHMMSS), filename);
+
+ File file = new File(fileName);
+ if (!file.getParentFile().exists()){
+ file.getParentFile().mkdirs();
+ }
+
+ return fileName;
}
/**
@@ -70,7 +79,13 @@ public class FileUtils {
* @return local file path
*/
public static String getUploadFilename(String tenantCode, String filename) {
- return String.format("%s/%s/resources/%s", PropertyUtils.getString(DATA_BASEDIR_PATH), tenantCode, filename);
+ String fileName = String.format("%s/%s/resources/%s", DATA_BASEDIR, tenantCode, filename);
+ File file = new File(fileName);
+ if (!file.getParentFile().exists()){
+ file.getParentFile().mkdirs();
+ }
+
+ return fileName;
}
/**
@@ -82,9 +97,14 @@ public class FileUtils {
* @return directory of process execution
*/
public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId, int taskInstanceId) {
-
- return String.format("%s/process/%s/%s/%s/%s", PropertyUtils.getString(PROCESS_EXEC_BASEPATH), Integer.toString(projectId),
+ String fileName = String.format("%s/exec/process/%s/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId),
Integer.toString(processDefineId), Integer.toString(processInstanceId),Integer.toString(taskInstanceId));
+ File file = new File(fileName);
+ if (!file.getParentFile().exists()){
+ file.getParentFile().mkdirs();
+ }
+
+ return fileName;
}
/**
@@ -95,15 +115,21 @@ public class FileUtils {
* @return directory of process instances
*/
public static String getProcessExecDir(int projectId, int processDefineId, int processInstanceId) {
- return String.format("%s/process/%s/%s/%s", PropertyUtils.getString(PROCESS_EXEC_BASEPATH), Integer.toString(projectId),
+ String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId),
Integer.toString(processDefineId), Integer.toString(processInstanceId));
+ File file = new File(fileName);
+ if (!file.getParentFile().exists()){
+ file.getParentFile().mkdirs();
+ }
+
+ return fileName;
}
/**
* @return get suffixes for resource files that support online viewing
*/
public static String getResourceViewSuffixs() {
- return PropertyUtils.getString(RESOURCE_VIEW_SUFFIXS);
+ return PropertyUtils.getString(RESOURCE_VIEW_SUFFIXS, RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE);
}
/**
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
index 541281f..431d015 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
@@ -38,6 +38,8 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.dolphinscheduler.common.Constants.RESOURCE_UPLOAD_PATH;
+
/**
* hadoop utils
* single instance
@@ -47,8 +49,11 @@ public class HadoopUtils implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(HadoopUtils.class);
private static String hdfsUser = PropertyUtils.getString(Constants.HDFS_ROOT_USER);
+ public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH,"/dolphinscheduler");
+
private static volatile HadoopUtils instance = new HadoopUtils();
private static volatile Configuration configuration;
+ private static volatile boolean yarnEnabled = false;
private static FileSystem fs;
@@ -72,8 +77,7 @@ public class HadoopUtils implements Closeable {
* init dolphinscheduler root path in hdfs
*/
private void initHdfsPath(){
- String hdfsPath = PropertyUtils.getString(Constants.DATA_STORE_2_HDFS_BASEPATH);
- Path path = new Path(hdfsPath);
+ Path path = new Path(resourceUploadPath);
try {
if (!fs.exists(path)) {
@@ -95,11 +99,11 @@ public class HadoopUtils implements Closeable {
try {
configuration = new Configuration();
- String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE);
+ String resUploadStartupType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
if (resUploadType == ResUploadType.HDFS){
- if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE)){
+ if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)){
System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
@@ -151,14 +155,28 @@ public class HadoopUtils implements Closeable {
fs = FileSystem.get(configuration);
}
-
+ /**
+ * if rmHaIds includes xx, it signs not use resourcemanager
+ * otherwise:
+ * if rmHaIds is empty, single resourcemanager enabled
+ * if rmHaIds not empty: resourcemanager HA enabled
+ */
String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
- if (!StringUtils.isEmpty(rmHaIds)) {
+ //not use resourcemanager
+ if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)){
+ yarnEnabled = false;
+ } else if (!StringUtils.isEmpty(rmHaIds)) {
+ //resourcemanager HA enabled
appAddress = getAppAddress(appAddress, rmHaIds);
+ yarnEnabled = true;
logger.info("appAddress : {}", appAddress);
+ } else {
+ //single resourcemanager enabled
+ yarnEnabled = true;
}
configuration.set(Constants.YARN_APPLICATION_STATUS_ADDRESS, appAddress);
+
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
@@ -361,6 +379,16 @@ public class HadoopUtils implements Closeable {
return fs.rename(new Path(src), new Path(dst));
}
+ /**
+ *
+ * haddop resourcemanager enabled or not
+ *
+ * @return true if haddop resourcemanager enabled
+ * @throws IOException errors
+ */
+ public boolean isYarnEnabled() {
+ return yarnEnabled;
+ }
/**
* get the state of an application
@@ -405,12 +433,11 @@ public class HadoopUtils implements Closeable {
* @return data hdfs path
*/
public static String getHdfsDataBasePath() {
- String basePath = PropertyUtils.getString(Constants.DATA_STORE_2_HDFS_BASEPATH);
- if ("/".equals(basePath)) {
+ if ("/".equals(resourceUploadPath)) {
// if basepath is configured to /, the generated url may be //default/resources (with extra leading /)
return "";
} else {
- return basePath;
+ return resourceUploadPath;
}
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
index c3e8197..b3ec7e3 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/PropertyUtils.java
@@ -74,7 +74,7 @@ public class PropertyUtils {
* @return judge whether resource upload startup
*/
public static Boolean getResUploadStartupState(){
- String resUploadStartupType = PropertyUtils.getString(Constants.RES_UPLOAD_STARTUP_TYPE);
+ String resUploadStartupType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resUploadStartupType);
return resUploadType == ResUploadType.HDFS || resUploadType == ResUploadType.S3;
}
@@ -93,6 +93,18 @@ public class PropertyUtils {
* get property value
*
* @param key property name
+ * @param defaultVal default value
+ * @return property value
+ */
+ public static String getString(String key, String defaultVal) {
+ String val = properties.getProperty(key.trim());
+ return val == null ? defaultVal : val;
+ }
+
+ /**
+ * get property value
+ *
+ * @param key property name
* @return get property int value , if key == null, then return -1
*/
public static int getInt(String key) {
@@ -135,6 +147,22 @@ public class PropertyUtils {
}
/**
+ * get property value
+ *
+ * @param key property name
+ * @param defaultValue default value
+ * @return property value
+ */
+ public static Boolean getBoolean(String key, boolean defaultValue) {
+ String value = properties.getProperty(key.trim());
+ if(null != value){
+ return Boolean.parseBoolean(value);
+ }
+
+ return defaultValue;
+ }
+
+ /**
* get property long value
* @param key key
* @param defaultVal default value
diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties
index 5a4aa14..843aa18 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -15,80 +15,53 @@
# limitations under the License.
#
-#task queue implementation, default "zookeeper"
+# task queue implementation, default "zookeeper" TODO
dolphinscheduler.queue.impl=zookeeper
-#zookeeper cluster. multiple are separated by commas. eg. 192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181
-zookeeper.quorum=localhost:2181
+# resource storage type : HDFS,S3,NONE
+resource.storage.type=HDFS
-#dolphinscheduler root directory
-zookeeper.dolphinscheduler.root=/dolphinscheduler
-
-#dolphinscheduler failover directory
-zookeeper.session.timeout=300
-zookeeper.connection.timeout=300
-zookeeper.retry.base.sleep=100
-zookeeper.retry.max.sleep=30000
-zookeeper.retry.maxtime=5
-
-# resource upload startup type : HDFS,S3,NONE
-res.upload.startup.type=NONE
-
-# Users who have permission to create directories under the HDFS root path
-hdfs.root.user=hdfs
-
-# data base dir, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended
-data.store2hdfs.basepath=/dolphinscheduler
-
-# user data directory path, self configuration, please make sure the directory exists and have read write permissions
-data.basedir.path=/tmp/dolphinscheduler
-
-# directory path for user data download. self configuration, please make sure the directory exists and have read write permissions
-data.download.basedir.path=/tmp/dolphinscheduler/download
-
-# process execute directory. self configuration, please make sure the directory exists and have read write permissions
-process.exec.basepath=/tmp/dolphinscheduler/exec
+# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended
+#resource.upload.path=/dolphinscheduler
+# user data local directory path, please make sure the directory exists and have read write permissions
+#data.basedir.path=/tmp/dolphinscheduler
# whether kerberos starts
-hadoop.security.authentication.startup.state=false
+#hadoop.security.authentication.startup.state=false
# java.security.krb5.conf path
-java.security.krb5.conf.path=/opt/krb5.conf
+#java.security.krb5.conf.path=/opt/krb5.conf
# loginUserFromKeytab user
-login.user.keytab.username=hdfs-mycluster@ESZ.COM
+#login.user.keytab.username=hdfs-mycluster@ESZ.COM
# loginUserFromKeytab path
-login.user.keytab.path=/opt/hdfs.headless.keytab
-
-# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions
-dolphinscheduler.env.path=/opt/dolphinscheduler_env.sh
+#login.user.keytab.path=/opt/hdfs.headless.keytab
#resource.view.suffixs
-resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties
-
-# is development state? default "false"
-development.state=true
+#resource.view.suffixs=txt,log,sh,conf,cfg,py,java,sql,hql,xml,properties
+# if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path
+hdfs.root.user=hdfs
-# ha or single namenode,If namenode ha needs to copy core-site.xml and hdfs-site.xml
-# to the conf directory,support s3,for example : s3a://dolphinscheduler
-fs.defaultFS=hdfs://mycluster:8020
+# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
+fs.defaultFS=hdfs://l:8020
-# s3 need,s3 endpoint
-fs.s3a.endpoint=http://192.168.199.91:9010
+# if resource.storage.type=S3,s3 endpoint
+#fs.s3a.endpoint=http://192.168.199.91:9010
-# s3 need,s3 access key
-fs.s3a.access.key=A3DXS30FO22544RE
+# if resource.storage.type=S3,s3 access key
+#fs.s3a.access.key=A3DXS30FO22544RE
-# s3 need,s3 secret key
-fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK
+# if resource.storage.type=S3,s3 secret key
+#fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK
-#resourcemanager ha note this need ips , this empty if single
+# if not use hadoop resourcemanager, please keep default value; if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty TODO
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
-# If it is a single resourcemanager, you only need to configure one host name. If it is resourcemanager HA, the default configuration is fine
+# If resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname.
yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s
-
+# system env path. self configuration, please make sure the directory and file exists and have read write execute permissions, TODO
+#dolphinscheduler.env.path=env/dolphinscheduler_env.sh
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
index e0c3761..58e3407 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/TaskRecordDao.java
@@ -22,9 +22,7 @@ import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskRecord;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.dolphinscheduler.dao.utils.PropertyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,25 +41,11 @@ public class TaskRecordDao {
private static Logger logger = LoggerFactory.getLogger(TaskRecordDao.class.getName());
/**
- * load conf
- */
- private static Configuration conf;
-
- static {
- try {
- conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES);
- }catch (ConfigurationException e){
- logger.error("load configuration exception",e);
- System.exit(1);
- }
- }
-
- /**
* get task record flag
* @return whether startup taskrecord
*/
public static boolean getTaskRecordFlag(){
- return conf.getBoolean(Constants.TASK_RECORD_FLAG,false);
+ return PropertyUtils.getBoolean(Constants.TASK_RECORD_FLAG,false);
}
/**
* create connection
@@ -72,9 +56,9 @@ public class TaskRecordDao {
return null;
}
String driver = "com.mysql.jdbc.Driver";
- String url = conf.getString(Constants.TASK_RECORD_URL);
- String username = conf.getString(Constants.TASK_RECORD_USER);
- String password = conf.getString(Constants.TASK_RECORD_PWD);
+ String url = PropertyUtils.getString(Constants.TASK_RECORD_URL);
+ String username = PropertyUtils.getString(Constants.TASK_RECORD_USER);
+ String password = PropertyUtils.getString(Constants.TASK_RECORD_PWD);
Connection conn = null;
try {
//classLoader,load driver
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java
index a3bc6a0..6aad2a3 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/ConnectionFactory.java
@@ -58,32 +58,7 @@ public class ConnectionFactory extends SpringConnectionFactory{
*/
public static DruidDataSource getDataSource() {
- DruidDataSource druidDataSource = new DruidDataSource();
-
- druidDataSource.setDriverClassName(conf.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME));
- druidDataSource.setUrl(conf.getString(Constants.SPRING_DATASOURCE_URL));
- druidDataSource.setUsername(conf.getString(Constants.SPRING_DATASOURCE_USERNAME));
- druidDataSource.setPassword(conf.getString(Constants.SPRING_DATASOURCE_PASSWORD));
- druidDataSource.setValidationQuery(conf.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY));
-
- druidDataSource.setPoolPreparedStatements(conf.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS));
- druidDataSource.setTestWhileIdle(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE));
- druidDataSource.setTestOnBorrow(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW));
- druidDataSource.setTestOnReturn(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN));
- druidDataSource.setKeepAlive(conf.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE));
-
- druidDataSource.setMinIdle(conf.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE));
- druidDataSource.setMaxActive(conf.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE));
- druidDataSource.setMaxWait(conf.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT));
- druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(conf.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE));
- druidDataSource.setInitialSize(conf.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE));
- druidDataSource.setTimeBetweenEvictionRunsMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS));
- druidDataSource.setTimeBetweenConnectErrorMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS));
- druidDataSource.setMinEvictableIdleTimeMillis(conf.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS));
- druidDataSource.setValidationQueryTimeout(conf.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT));
- //auto commit
- druidDataSource.setDefaultAutoCommit(conf.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT));
-
+ DruidDataSource druidDataSource = dataSource();
return druidDataSource;
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
index cb9f22e..4bdbaa2 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/datasource/SpringConnectionFactory.java
@@ -25,6 +25,7 @@ import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.utils.PropertyUtils;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.type.JdbcType;
@@ -48,19 +49,6 @@ public class SpringConnectionFactory {
private static final Logger logger = LoggerFactory.getLogger(SpringConnectionFactory.class);
- /**
- * Load configuration file
- */
- protected static org.apache.commons.configuration.Configuration conf;
-
- static {
- try {
- conf = new PropertiesConfiguration(Constants.APPLICATION_PROPERTIES);
- } catch (ConfigurationException e) {
- logger.error("load configuration exception", e);
- System.exit(1);
- }
- }
/**
* pagination interceptor
@@ -76,33 +64,33 @@ public class SpringConnectionFactory {
* @return druid dataSource
*/
@Bean(destroyMethod="")
- public DruidDataSource dataSource() {
+ public static DruidDataSource dataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
- druidDataSource.setDriverClassName(conf.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME));
- druidDataSource.setUrl(conf.getString(Constants.SPRING_DATASOURCE_URL));
- druidDataSource.setUsername(conf.getString(Constants.SPRING_DATASOURCE_USERNAME));
- druidDataSource.setPassword(conf.getString(Constants.SPRING_DATASOURCE_PASSWORD));
- druidDataSource.setValidationQuery(conf.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY));
-
- druidDataSource.setPoolPreparedStatements(conf.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS));
- druidDataSource.setTestWhileIdle(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE));
- druidDataSource.setTestOnBorrow(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW));
- druidDataSource.setTestOnReturn(conf.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN));
- druidDataSource.setKeepAlive(conf.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE));
-
- druidDataSource.setMinIdle(conf.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE));
- druidDataSource.setMaxActive(conf.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE));
- druidDataSource.setMaxWait(conf.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT));
- druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(conf.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE));
- druidDataSource.setInitialSize(conf.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE));
- druidDataSource.setTimeBetweenEvictionRunsMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS));
- druidDataSource.setTimeBetweenConnectErrorMillis(conf.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS));
- druidDataSource.setMinEvictableIdleTimeMillis(conf.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS));
- druidDataSource.setValidationQueryTimeout(conf.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT));
+ druidDataSource.setDriverClassName(PropertyUtils.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME));
+ druidDataSource.setUrl(PropertyUtils.getString(Constants.SPRING_DATASOURCE_URL));
+ druidDataSource.setUsername(PropertyUtils.getString(Constants.SPRING_DATASOURCE_USERNAME));
+ druidDataSource.setPassword(PropertyUtils.getString(Constants.SPRING_DATASOURCE_PASSWORD));
+ druidDataSource.setValidationQuery(PropertyUtils.getString(Constants.SPRING_DATASOURCE_VALIDATION_QUERY,"SELECT 1"));
+
+ druidDataSource.setPoolPreparedStatements(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_POOL_PREPARED_STATEMENTS,true));
+ druidDataSource.setTestWhileIdle(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_WHILE_IDLE,true));
+ druidDataSource.setTestOnBorrow(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_BORROW,true));
+ druidDataSource.setTestOnReturn(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_TEST_ON_RETURN,true));
+ druidDataSource.setKeepAlive(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_KEEP_ALIVE,true));
+
+ druidDataSource.setMinIdle(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MIN_IDLE,5));
+ druidDataSource.setMaxActive(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_ACTIVE,50));
+ druidDataSource.setMaxWait(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_WAIT,60000));
+ druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_MAX_POOL_PREPARED_STATEMENT_PER_CONNECTION_SIZE,20));
+ druidDataSource.setInitialSize(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_INITIAL_SIZE,5));
+ druidDataSource.setTimeBetweenEvictionRunsMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_EVICTION_RUNS_MILLIS,60000));
+ druidDataSource.setTimeBetweenConnectErrorMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_TIME_BETWEEN_CONNECT_ERROR_MILLIS,60000));
+ druidDataSource.setMinEvictableIdleTimeMillis(PropertyUtils.getLong(Constants.SPRING_DATASOURCE_MIN_EVICTABLE_IDLE_TIME_MILLIS,300000));
+ druidDataSource.setValidationQueryTimeout(PropertyUtils.getInt(Constants.SPRING_DATASOURCE_VALIDATION_QUERY_TIMEOUT,3));
//auto commit
- druidDataSource.setDefaultAutoCommit(conf.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT));
+ druidDataSource.setDefaultAutoCommit(PropertyUtils.getBoolean(Constants.SPRING_DATASOURCE_DEFAULT_AUTO_COMMIT,true));
return druidDataSource;
}
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/PropertyUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/PropertyUtils.java
index cdd481a..47cfadb 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/PropertyUtils.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/PropertyUtils.java
@@ -49,7 +49,7 @@ public class PropertyUtils {
* init
*/
private void init(){
- String[] propertyFiles = new String[]{Constants.APPLICATION_PROPERTIES};
+ String[] propertyFiles = new String[]{Constants.DATASOURCE_PROPERTIES};
for (String fileName : propertyFiles) {
InputStream fis = null;
try {
@@ -77,6 +77,17 @@ public class PropertyUtils {
return properties.getProperty(key);
}
+ /**
+ * get property value
+ *
+ * @param key property name
+ * @param defaultVal default value
+ * @return property value
+ */
+ public static String getString(String key, String defaultVal) {
+ String val = properties.getProperty(key.trim());
+ return val == null ? defaultVal : val;
+ }
/**
* get property value
@@ -106,4 +117,46 @@ public class PropertyUtils {
}
return defaultValue;
}
+
+ /**
+ * get property value
+ *
+ * @param key property name
+ * @return property value
+ */
+ public static Boolean getBoolean(String key) {
+ String value = properties.getProperty(key.trim());
+ if(null != value){
+ return Boolean.parseBoolean(value);
+ }
+
+ return false;
+ }
+
+ /**
+ * get property value
+ *
+ * @param key property name
+ * @param defaultValue default value
+ * @return property value
+ */
+ public static Boolean getBoolean(String key, boolean defaultValue) {
+ String value = properties.getProperty(key.trim());
+ if(null != value){
+ return Boolean.parseBoolean(value);
+ }
+
+ return defaultValue;
+ }
+
+ /**
+ * get property long value
+ * @param key key
+ * @param defaultVal default value
+ * @return property value
+ */
+ public static long getLong(String key, long defaultVal) {
+ String val = getString(key);
+ return val == null ? defaultVal : Long.parseLong(val);
+ }
}
diff --git a/dolphinscheduler-dao/src/main/resources/application.properties b/dolphinscheduler-dao/src/main/resources/datasource.properties
similarity index 61%
rename from dolphinscheduler-dao/src/main/resources/application.properties
rename to dolphinscheduler-dao/src/main/resources/datasource.properties
index b79c1ed..3e06ab4 100644
--- a/dolphinscheduler-dao/src/main/resources/application.properties
+++ b/dolphinscheduler-dao/src/main/resources/datasource.properties
@@ -15,59 +15,61 @@
# limitations under the License.
#
-# base spring data source configuration
-spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
+
# postgre
-spring.datasource.driver-class-name=org.postgresql.Driver
-spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler
+#spring.datasource.driver-class-name=org.postgresql.Driver
+#spring.datasource.url=jdbc:postgresql://localhost:5432/dolphinscheduler
# mysql
-#spring.datasource.driver-class-name=com.mysql.jdbc.Driver
-#spring.datasource.url=jdbc:mysql://localhost:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
-spring.datasource.username=test
-spring.datasource.password=test
+spring.datasource.driver-class-name=com.mysql.jdbc.Driver
+spring.datasource.url=jdbc:mysql://localhost:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
+spring.datasource.username=root
+spring.datasource.password=root@123
+
+## base spring data source configuration todo need to remove
+#spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
# connection configuration
-spring.datasource.initialSize=5
+#spring.datasource.initialSize=5
# min connection number
-spring.datasource.minIdle=5
+#spring.datasource.minIdle=5
# max connection number
-spring.datasource.maxActive=50
+#spring.datasource.maxActive=50
# max wait time for get a connection in milliseconds. if configuring maxWait, fair locks are enabled by default and concurrency efficiency decreases.
# If necessary, unfair locks can be used by configuring the useUnfairLock attribute to true.
-spring.datasource.maxWait=60000
+#spring.datasource.maxWait=60000
# milliseconds for check to close free connections
-spring.datasource.timeBetweenEvictionRunsMillis=60000
+#spring.datasource.timeBetweenEvictionRunsMillis=60000
# the Destroy thread detects the connection interval and closes the physical connection in milliseconds if the connection idle time is greater than or equal to minEvictableIdleTimeMillis.
-spring.datasource.timeBetweenConnectErrorMillis=60000
+#spring.datasource.timeBetweenConnectErrorMillis=60000
# the longest time a connection remains idle without being evicted, in milliseconds
-spring.datasource.minEvictableIdleTimeMillis=300000
+#spring.datasource.minEvictableIdleTimeMillis=300000
#the SQL used to check whether the connection is valid requires a query statement. If validation Query is null, testOnBorrow, testOnReturn, and testWhileIdle will not work.
-spring.datasource.validationQuery=SELECT 1
+#spring.datasource.validationQuery=SELECT 1
#check whether the connection is valid for timeout, in seconds
-spring.datasource.validationQueryTimeout=3
+#spring.datasource.validationQueryTimeout=3
# when applying for a connection, if it is detected that the connection is idle longer than time Between Eviction Runs Millis,
# validation Query is performed to check whether the connection is valid
-spring.datasource.testWhileIdle=true
+#spring.datasource.testWhileIdle=true
#execute validation to check if the connection is valid when applying for a connection
-spring.datasource.testOnBorrow=true
+#spring.datasource.testOnBorrow=true
#execute validation to check if the connection is valid when the connection is returned
-spring.datasource.testOnReturn=false
-spring.datasource.defaultAutoCommit=true
-spring.datasource.keepAlive=true
+#spring.datasource.testOnReturn=false
+#spring.datasource.defaultAutoCommit=true
+#spring.datasource.keepAlive=true
# open PSCache, specify count PSCache for every connection
-spring.datasource.poolPreparedStatements=true
-spring.datasource.maxPoolPreparedStatementPerConnectionSize=20
+#spring.datasource.poolPreparedStatements=true
+#spring.datasource.maxPoolPreparedStatementPerConnectionSize=20
-spring.datasource.spring.datasource.filters=stat,wall,log4j
-spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
+#spring.datasource.filters=stat,wall,log4j
+#spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
index e7b6327..da2405e 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/consumer/TaskUpdateQueueConsumer.java
@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.server.master.consumer;
import com.alibaba.fastjson.JSONObject;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.enums.UdfType;
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
index 9d96264..9fa8f81 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/quartz/QuartzExecutors.java
@@ -16,13 +16,20 @@
*/
package org.apache.dolphinscheduler.service.quartz;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Schedule;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
+import org.quartz.impl.jdbcjobstore.JobStoreTX;
+import org.quartz.impl.jdbcjobstore.PostgreSQLDelegate;
+import org.quartz.impl.jdbcjobstore.StdJDBCDelegate;
import org.quartz.impl.matchers.GroupMatcher;
+import org.quartz.simpl.SimpleThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +37,7 @@ import java.util.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import static org.apache.dolphinscheduler.common.Constants.ORG_POSTGRESQL_DRIVER;
import static org.quartz.CronScheduleBuilder.cronSchedule;
import static org.quartz.JobBuilder.newJob;
import static org.quartz.TriggerBuilder.newTrigger;
@@ -59,7 +67,21 @@ public class QuartzExecutors {
*/
private static volatile QuartzExecutors INSTANCE = null;
- private QuartzExecutors() {}
+ /**
+ * load conf
+ */
+ private static Configuration conf;
+
+ static {
+ try {
+ conf = new PropertiesConfiguration(Constants.QUARTZ_PROPERTIES_PATH);
+ }catch (ConfigurationException e){
+ logger.warn("not loaded quartz configuration file, will used default value",e);
+ }
+ }
+
+ private QuartzExecutors() {
+ }
/**
* thread safe and performance promote
@@ -87,7 +109,32 @@ public class QuartzExecutors {
*/
private void init() {
try {
- SchedulerFactory schedulerFactory = new StdSchedulerFactory(Constants.QUARTZ_PROPERTIES_PATH);
+ StdSchedulerFactory schedulerFactory = new StdSchedulerFactory();
+ Properties properties = new Properties();
+
+ String dataSourceDriverClass = org.apache.dolphinscheduler.dao.utils.PropertyUtils.getString(Constants.SPRING_DATASOURCE_DRIVER_CLASS_NAME);
+ if (dataSourceDriverClass.contains(ORG_POSTGRESQL_DRIVER)){
+ properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, PostgreSQLDelegate.class.getName()));
+ } else {
+ properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DRIVERDELEGATECLASS, StdJDBCDelegate.class.getName()));
+ }
+ properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME, conf.getString(Constants.ORG_QUARTZ_SCHEDULER_INSTANCENAME, Constants.QUARTZ_INSTANCENAME));
+ properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID, conf.getString(Constants.ORG_QUARTZ_SCHEDULER_INSTANCEID, Constants.QUARTZ_INSTANCEID));
+ properties.setProperty(Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,conf.getString(Constants.ORG_QUARTZ_SCHEDULER_MAKESCHEDULERTHREADDAEMON,Constants.STRING_TRUE));
+ properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_USEPROPERTIES,Constants.STRING_TRUE));
+ properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_CLASS,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_CLASS, SimpleThreadPool.class.getName()));
+ properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_MAKETHREADSDAEMONS,Constants.STRING_TRUE));
+ properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_THREADCOUNT, Constants.QUARTZ_THREADCOUNT));
+ properties.setProperty(Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY,conf.getString(Constants.ORG_QUARTZ_THREADPOOL_THREADPRIORITY, Constants.QUARTZ_THREADPRIORITY));
+ properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_CLASS,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_CLASS, JobStoreTX.class.getName()));
+ properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_TABLEPREFIX, Constants.QUARTZ_TABLE_PREFIX));
+ properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_ISCLUSTERED,Constants.STRING_TRUE));
+ properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_MISFIRETHRESHOLD, Constants.QUARTZ_MISFIRETHRESHOLD));
+ properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_CLUSTERCHECKININTERVAL, Constants.QUARTZ_CLUSTERCHECKININTERVAL));
+ properties.setProperty(Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE,conf.getString(Constants.ORG_QUARTZ_JOBSTORE_DATASOURCE, Constants.QUARTZ_DATASOURCE));
+ properties.setProperty(Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,conf.getString(Constants.ORG_QUARTZ_DATASOURCE_MYDS_CONNECTIONPROVIDER_CLASS,DruidConnectionProvider.class.getName()));
+
+ schedulerFactory.initialize(properties);
scheduler = schedulerFactory.getScheduler();
} catch (SchedulerException e) {
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
index c6bdfc3..5bdc6f8 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/zk/ZookeeperConfig.java
@@ -24,7 +24,7 @@ import org.springframework.stereotype.Component;
* zookeeper conf
*/
@Component
-@PropertySource("classpath:common.properties")
+@PropertySource("classpath:zookeeper.properties")
public class ZookeeperConfig {
//zk connect config
diff --git a/dolphinscheduler-service/src/main/resources/quartz.properties b/dolphinscheduler-service/src/main/resources/quartz.properties
index 60a0968..3bd82c7 100644
--- a/dolphinscheduler-service/src/main/resources/quartz.properties
+++ b/dolphinscheduler-service/src/main/resources/quartz.properties
@@ -18,45 +18,36 @@
#============================================================================
# Configure Main Scheduler Properties
#============================================================================
-org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
+#org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
-# postgre
-#org.quartz.dataSource.myDs.driver = org.postgresql.Driver
-#org.quartz.dataSource.myDs.URL = jdbc:postgresql://localhost:5432/dolphinscheduler?characterEncoding=utf8
-# mysql
-#org.quartz.dataSource.myDs.driver = com.mysql.jdbc.Driver
-#org.quartz.dataSource.myDs.URL = jdbc:mysql://localhost:3306/dolphinscheduler?characterEncoding=utf8
-#org.quartz.dataSource.myDs.user = root
-#org.quartz.dataSource.myDs.password = 123456
-
-org.quartz.scheduler.instanceName = DolphinScheduler
-org.quartz.scheduler.instanceId = AUTO
-org.quartz.scheduler.makeSchedulerThreadDaemon = true
-org.quartz.jobStore.useProperties = false
+#org.quartz.scheduler.instanceName = DolphinScheduler
+#org.quartz.scheduler.instanceId = AUTO
+#org.quartz.scheduler.makeSchedulerThreadDaemon = true
+#org.quartz.jobStore.useProperties = false
#============================================================================
# Configure ThreadPool
#============================================================================
-org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
-org.quartz.threadPool.makeThreadsDaemons = true
-org.quartz.threadPool.threadCount = 25
-org.quartz.threadPool.threadPriority = 5
+#org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
+#org.quartz.threadPool.makeThreadsDaemons = true
+#org.quartz.threadPool.threadCount = 25
+#org.quartz.threadPool.threadPriority = 5
#============================================================================
# Configure JobStore
#============================================================================
-org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
+#org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
-org.quartz.jobStore.tablePrefix = QRTZ_
-org.quartz.jobStore.isClustered = true
-org.quartz.jobStore.misfireThreshold = 60000
-org.quartz.jobStore.clusterCheckinInterval = 5000
+#org.quartz.jobStore.tablePrefix = QRTZ_
+#org.quartz.jobStore.isClustered = true
+#org.quartz.jobStore.misfireThreshold = 60000
+#org.quartz.jobStore.clusterCheckinInterval = 5000
org.quartz.jobStore.acquireTriggersWithinLock=true
-org.quartz.jobStore.dataSource = myDs
+#org.quartz.jobStore.dataSource = myDs
#============================================================================
# Configure Datasources
#============================================================================
-org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider
\ No newline at end of file
+#org.quartz.dataSource.myDs.connectionProvider.class = org.apache.dolphinscheduler.service.quartz.DruidConnectionProvider
\ No newline at end of file
diff --git a/dolphinscheduler-service/src/main/resources/zookeeper.properties b/dolphinscheduler-service/src/main/resources/zookeeper.properties
new file mode 100644
index 0000000..1d3c53a
--- /dev/null
+++ b/dolphinscheduler-service/src/main/resources/zookeeper.properties
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# task queue implementation, default "zookeeper"
+dolphinscheduler.queue.impl=zookeeper
+
+# zookeeper cluster. multiple are separated by commas. eg. 192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181
+zookeeper.quorum=192.168.xx.xx:2181,192.168.xx.xx:2181,192.168.xx.xx:2181
+
+# dolphinscheduler root directory
+#zookeeper.dolphinscheduler.root=/dolphinscheduler
+
+# dolphinscheduler failover directory
+#zookeeper.session.timeout=60000
+#zookeeper.connection.timeout=300
+#zookeeper.retry.base.sleep=100
+#zookeeper.retry.max.sleep=30000
+#zookeeper.retry.maxtime=5
\ No newline at end of file