You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/05/30 15:07:26 UTC
[incubator-dolphinscheduler] branch dev updated: add job history to
judge application status/2625 (#2848)
This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 1513aae add job history to judge application status/2625 (#2848)
1513aae is described below
commit 1513aae3ceb6f70633f64cc3a290b8f5ec6ab042
Author: itbasketplayer <82...@qq.com>
AuthorDate: Sat May 30 23:07:17 2020 +0800
add job history to judge application status/2625 (#2848)
* job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
* job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
* job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
Co-authored-by: yuhaibin@lizhi.fm <35716fc5847f6d154cf556296453ca91>
Co-authored-by: dailidong <da...@gmail.com>
---
.../apache/dolphinscheduler/common/Constants.java | 5 ++
.../dolphinscheduler/common/utils/HadoopUtils.java | 59 ++++++++++++++--------
.../src/main/resources/common.properties | 14 ++---
.../common/utils/HadoopUtilsTest.java | 6 +++
4 files changed, 58 insertions(+), 26 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index effa4f0..fc09960 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -104,6 +104,11 @@ public final class Constants {
public static final String YARN_APPLICATION_STATUS_ADDRESS = "yarn.application.status.address";
/**
+ * yarn.job.history.status.address
+ */
+ public static final String YARN_JOB_HISTORY_STATUS_ADDRESS = "yarn.job.history.status.address";
+
+ /**
* hdfs configuration
* hdfs.root.user
*/
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
index 963aff5..1544b44 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
@@ -16,16 +16,16 @@
*/
package org.apache.dolphinscheduler.common.utils;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONException;
+import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import org.apache.commons.io.IOUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.enums.ResUploadType;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONException;
-import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.io.IOUtils;
import org.apache.dolphinscheduler.common.enums.ResourceType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
@@ -59,6 +59,7 @@ public class HadoopUtils implements Closeable {
public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
+ public static final String jobHistoryAddress = PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
@@ -114,11 +115,11 @@ public class HadoopUtils implements Closeable {
String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType);
- if (resUploadType == ResUploadType.HDFS){
- if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)){
+ if (resUploadType == ResUploadType.HDFS) {
+ if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) {
System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
- configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
+ configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
hdfsUser = "";
UserGroupInformation.setConfiguration(configuration);
UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
@@ -195,7 +196,7 @@ public class HadoopUtils implements Closeable {
*/
String appUrl = "";
//not use resourcemanager
- if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)){
+ if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)) {
yarnEnabled = false;
logger.warn("should not step here");
@@ -212,6 +213,12 @@ public class HadoopUtils implements Closeable {
return String.format(appUrl, applicationId);
}
+ public String getJobHistoryUrl(String applicationId) {
+ //eg:application_1587475402360_712719 -> job_1587475402360_712719
+ String jobId = applicationId.replace("application", "job");
+ return String.format(jobHistoryAddress, jobId);
+ }
+
/**
* cat file on hdfs
*
@@ -389,9 +396,10 @@ public class HadoopUtils implements Closeable {
/**
* hadoop resourcemanager enabled or not
+ *
* @return result
*/
- public boolean isYarnEnabled() {
+ public boolean isYarnEnabled() {
return yarnEnabled;
}
@@ -407,12 +415,22 @@ public class HadoopUtils implements Closeable {
return null;
}
+ String result = Constants.FAILED;
String applicationUrl = getApplicationUrl(applicationId);
+ logger.info("applicationUrl={}", applicationUrl);
String responseContent = HttpUtils.get(applicationUrl);
-
- JSONObject jsonObject = JSON.parseObject(responseContent);
- String result = jsonObject.getJSONObject("app").getString("finalStatus");
+ if (responseContent != null) {
+ JSONObject jsonObject = JSON.parseObject(responseContent);
+ result = jsonObject.getJSONObject("app").getString("finalStatus");
+ } else {
+ //may be in job history
+ String jobHistoryUrl = getJobHistoryUrl(applicationId);
+ logger.info("jobHistoryUrl={}", jobHistoryUrl);
+ responseContent = HttpUtils.get(jobHistoryUrl);
+ JSONObject jsonObject = JSONObject.parseObject(responseContent);
+ result = jsonObject.getJSONObject("job").getString("state");
+ }
switch (result) {
case Constants.ACCEPTED:
@@ -435,6 +453,7 @@ public class HadoopUtils implements Closeable {
/**
* get data hdfs path
+ *
* @return data hdfs path
*/
public static String getHdfsDataBasePath() {
@@ -452,7 +471,7 @@ public class HadoopUtils implements Closeable {
* @param tenantCode tenant code
* @return hdfs resource dir
*/
- public static String getHdfsDir(ResourceType resourceType,String tenantCode) {
+ public static String getHdfsDir(ResourceType resourceType, String tenantCode) {
String hdfsDir = "";
if (resourceType.equals(ResourceType.FILE)) {
hdfsDir = getHdfsResDir(tenantCode);
@@ -497,16 +516,16 @@ public class HadoopUtils implements Closeable {
/**
* get hdfs file name
*
- * @param resourceType resource type
- * @param tenantCode tenant code
- * @param fileName file name
+ * @param resourceType resource type
+ * @param tenantCode tenant code
+ * @param fileName file name
* @return hdfs file name
*/
public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) {
if (fileName.startsWith("/")) {
- fileName = fileName.replaceFirst("/","");
+ fileName = fileName.replaceFirst("/", "");
}
- return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName);
+ return String.format("%s/%s", getHdfsDir(resourceType, tenantCode), fileName);
}
/**
@@ -518,7 +537,7 @@ public class HadoopUtils implements Closeable {
*/
public static String getHdfsResourceFileName(String tenantCode, String fileName) {
if (fileName.startsWith("/")) {
- fileName = fileName.replaceFirst("/","");
+ fileName = fileName.replaceFirst("/", "");
}
return String.format("%s/%s", getHdfsResDir(tenantCode), fileName);
}
@@ -532,7 +551,7 @@ public class HadoopUtils implements Closeable {
*/
public static String getHdfsUdfFileName(String tenantCode, String fileName) {
if (fileName.startsWith("/")) {
- fileName = fileName.replaceFirst("/","");
+ fileName = fileName.replaceFirst("/", "");
}
return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName);
}
diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties
index 3852c31..0cc118f 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -18,7 +18,7 @@
# resource storage type : HDFS,S3,NONE
resource.storage.type=NONE
-# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended
+# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions."/dolphinscheduler" is recommended
#resource.upload.path=/dolphinscheduler
# user data local directory path, please make sure the directory exists and have read write permissions
@@ -42,16 +42,16 @@ resource.storage.type=NONE
# if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path
hdfs.root.user=hdfs
-# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
+# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
fs.defaultFS=hdfs://mycluster:8020
-# if resource.storage.type=S3,s3 endpoint
+# if resource.storage.type=S3,s3 endpoint
#fs.s3a.endpoint=http://192.168.199.91:9010
-# if resource.storage.type=S3,s3 access key
+# if resource.storage.type=S3,s3 access key
#fs.s3a.access.key=A3DXS30FO22544RE
-# if resource.storage.type=S3,s3 secret key
+# if resource.storage.type=S3,s3 secret key
#fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK
# if not use hadoop resourcemanager, please keep default value; if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty
@@ -59,8 +59,10 @@ yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
# If resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname.
yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s
+# job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
+yarn.job.history.status.address=http://ark1:19888/ws/v1/history/mapreduce/jobs/%s
# system env path
#dolphinscheduler.env.path=env/dolphinscheduler_env.sh
development.state=false
-kerberos.expire.time=7
\ No newline at end of file
+kerberos.expire.time=7
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java
index e239fe7..440f863 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java
@@ -191,6 +191,12 @@ public class HadoopUtilsTest {
}
@Test
+ public void getJobHistoryUrl(){
+ String application_1516778421218_0042 = hadoopUtils.getJobHistoryUrl("application_1529051418016_0167");
+ logger.info(application_1516778421218_0042);
+ }
+
+ @Test
public void catFileWithLimitTest() {
List<String> stringList = new ArrayList<>();
try {