You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by li...@apache.org on 2020/05/30 15:07:26 UTC

[incubator-dolphinscheduler] branch dev updated: add job history to judge application status/2625 (#2848)

This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1513aae  add job history to judge application status/2625 (#2848)
1513aae is described below

commit 1513aae3ceb6f70633f64cc3a290b8f5ec6ab042
Author: itbasketplayer <82...@qq.com>
AuthorDate: Sat May 30 23:07:17 2020 +0800

    add job history to judge application status/2625 (#2848)
    
    * job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
    
    * job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
    
    * job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
    
    Co-authored-by: yuhaibin@lizhi.fm <35716fc5847f6d154cf556296453ca91>
    Co-authored-by: dailidong <da...@gmail.com>
---
 .../apache/dolphinscheduler/common/Constants.java  |  5 ++
 .../dolphinscheduler/common/utils/HadoopUtils.java | 59 ++++++++++++++--------
 .../src/main/resources/common.properties           | 14 ++---
 .../common/utils/HadoopUtilsTest.java              |  6 +++
 4 files changed, 58 insertions(+), 26 deletions(-)

diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index effa4f0..fc09960 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -104,6 +104,11 @@ public final class Constants {
     public static final String YARN_APPLICATION_STATUS_ADDRESS = "yarn.application.status.address";
 
     /**
+     * yarn.job.history.status.address
+     */
+    public static final String YARN_JOB_HISTORY_STATUS_ADDRESS = "yarn.job.history.status.address";
+
+    /**
      * hdfs configuration
      * hdfs.root.user
      */
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
index 963aff5..1544b44 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/HadoopUtils.java
@@ -16,16 +16,16 @@
  */
 package org.apache.dolphinscheduler.common.utils;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONException;
+import com.alibaba.fastjson.JSONObject;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import org.apache.commons.io.IOUtils;
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.enums.ResUploadType;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONException;
-import com.alibaba.fastjson.JSONObject;
-import org.apache.commons.io.IOUtils;
 import org.apache.dolphinscheduler.common.enums.ResourceType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
@@ -59,6 +59,7 @@ public class HadoopUtils implements Closeable {
     public static final String resourceUploadPath = PropertyUtils.getString(RESOURCE_UPLOAD_PATH, "/dolphinscheduler");
     public static final String rmHaIds = PropertyUtils.getString(Constants.YARN_RESOURCEMANAGER_HA_RM_IDS);
     public static final String appAddress = PropertyUtils.getString(Constants.YARN_APPLICATION_STATUS_ADDRESS);
+    public static final String jobHistoryAddress = PropertyUtils.getString(Constants.YARN_JOB_HISTORY_STATUS_ADDRESS);
 
     private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
 
@@ -114,11 +115,11 @@ public class HadoopUtils implements Closeable {
             String resourceStorageType = PropertyUtils.getString(Constants.RESOURCE_STORAGE_TYPE);
             ResUploadType resUploadType = ResUploadType.valueOf(resourceStorageType);
 
-            if (resUploadType == ResUploadType.HDFS){
-                if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE,false)){
+            if (resUploadType == ResUploadType.HDFS) {
+                if (PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false)) {
                     System.setProperty(Constants.JAVA_SECURITY_KRB5_CONF,
                             PropertyUtils.getString(Constants.JAVA_SECURITY_KRB5_CONF_PATH));
-                    configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION,"kerberos");
+                    configuration.set(Constants.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
                     hdfsUser = "";
                     UserGroupInformation.setConfiguration(configuration);
                     UserGroupInformation.loginUserFromKeytab(PropertyUtils.getString(Constants.LOGIN_USER_KEY_TAB_USERNAME),
@@ -195,7 +196,7 @@ public class HadoopUtils implements Closeable {
          */
         String appUrl = "";
         //not use resourcemanager
-        if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)){
+        if (rmHaIds.contains(Constants.YARN_RESOURCEMANAGER_HA_XX)) {
 
             yarnEnabled = false;
             logger.warn("should not step here");
@@ -212,6 +213,12 @@ public class HadoopUtils implements Closeable {
         return String.format(appUrl, applicationId);
     }
 
+    public String getJobHistoryUrl(String applicationId) {
+        //eg:application_1587475402360_712719 -> job_1587475402360_712719
+        String jobId = applicationId.replace("application", "job");
+        return String.format(jobHistoryAddress, jobId);
+    }
+
     /**
      * cat file on hdfs
      *
@@ -389,9 +396,10 @@ public class HadoopUtils implements Closeable {
 
     /**
      * hadoop resourcemanager enabled or not
+     *
      * @return result
      */
-    public boolean isYarnEnabled()  {
+    public boolean isYarnEnabled() {
         return yarnEnabled;
     }
 
@@ -407,12 +415,22 @@ public class HadoopUtils implements Closeable {
             return null;
         }
 
+        String result = Constants.FAILED;
         String applicationUrl = getApplicationUrl(applicationId);
+        logger.info("applicationUrl={}", applicationUrl);
 
         String responseContent = HttpUtils.get(applicationUrl);
-
-        JSONObject jsonObject = JSON.parseObject(responseContent);
-        String result = jsonObject.getJSONObject("app").getString("finalStatus");
+        if (responseContent != null) {
+            JSONObject jsonObject = JSON.parseObject(responseContent);
+            result = jsonObject.getJSONObject("app").getString("finalStatus");
+        } else {
+            //may be in job history
+            String jobHistoryUrl = getJobHistoryUrl(applicationId);
+            logger.info("jobHistoryUrl={}", jobHistoryUrl);
+            responseContent = HttpUtils.get(jobHistoryUrl);
+            JSONObject jsonObject = JSONObject.parseObject(responseContent);
+            result = jsonObject.getJSONObject("job").getString("state");
+        }
 
         switch (result) {
             case Constants.ACCEPTED:
@@ -435,6 +453,7 @@ public class HadoopUtils implements Closeable {
 
     /**
      * get data hdfs path
+     *
      * @return data hdfs path
      */
     public static String getHdfsDataBasePath() {
@@ -452,7 +471,7 @@ public class HadoopUtils implements Closeable {
      * @param tenantCode tenant code
      * @return hdfs resource dir
      */
-    public static String getHdfsDir(ResourceType resourceType,String tenantCode) {
+    public static String getHdfsDir(ResourceType resourceType, String tenantCode) {
         String hdfsDir = "";
         if (resourceType.equals(ResourceType.FILE)) {
             hdfsDir = getHdfsResDir(tenantCode);
@@ -497,16 +516,16 @@ public class HadoopUtils implements Closeable {
     /**
      * get hdfs file name
      *
-     * @param resourceType  resource type
-     * @param tenantCode    tenant code
-     * @param fileName      file name
+     * @param resourceType resource type
+     * @param tenantCode   tenant code
+     * @param fileName     file name
      * @return hdfs file name
      */
     public static String getHdfsFileName(ResourceType resourceType, String tenantCode, String fileName) {
         if (fileName.startsWith("/")) {
-            fileName = fileName.replaceFirst("/","");
+            fileName = fileName.replaceFirst("/", "");
         }
-        return String.format("%s/%s", getHdfsDir(resourceType,tenantCode), fileName);
+        return String.format("%s/%s", getHdfsDir(resourceType, tenantCode), fileName);
     }
 
     /**
@@ -518,7 +537,7 @@ public class HadoopUtils implements Closeable {
      */
     public static String getHdfsResourceFileName(String tenantCode, String fileName) {
         if (fileName.startsWith("/")) {
-            fileName = fileName.replaceFirst("/","");
+            fileName = fileName.replaceFirst("/", "");
         }
         return String.format("%s/%s", getHdfsResDir(tenantCode), fileName);
     }
@@ -532,7 +551,7 @@ public class HadoopUtils implements Closeable {
      */
     public static String getHdfsUdfFileName(String tenantCode, String fileName) {
         if (fileName.startsWith("/")) {
-            fileName = fileName.replaceFirst("/","");
+            fileName = fileName.replaceFirst("/", "");
         }
         return String.format("%s/%s", getHdfsUdfDir(tenantCode), fileName);
     }
diff --git a/dolphinscheduler-common/src/main/resources/common.properties b/dolphinscheduler-common/src/main/resources/common.properties
index 3852c31..0cc118f 100644
--- a/dolphinscheduler-common/src/main/resources/common.properties
+++ b/dolphinscheduler-common/src/main/resources/common.properties
@@ -18,7 +18,7 @@
 # resource storage type : HDFS,S3,NONE
 resource.storage.type=NONE
 
-# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions。"/dolphinscheduler" is recommended
+# resource store on HDFS/S3 path, resource file will store to this hadoop hdfs path, self configuration, please make sure the directory exists on hdfs and have read write permissions."/dolphinscheduler" is recommended
 #resource.upload.path=/dolphinscheduler
 
 # user data local directory path, please make sure the directory exists and have read write permissions
@@ -42,16 +42,16 @@ resource.storage.type=NONE
 # if resource.storage.type=HDFS, the user need to have permission to create directories under the HDFS root path
 hdfs.root.user=hdfs
 
-# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
+# if resource.storage.type=S3,the value like: s3a://dolphinscheduler ; if resource.storage.type=HDFS, When namenode HA is enabled, you need to copy core-site.xml and hdfs-site.xml to conf dir
 fs.defaultFS=hdfs://mycluster:8020
 
-# if resource.storage.type=S3,s3 endpoint
+# if resource.storage.type=S3,s3 endpoint
 #fs.s3a.endpoint=http://192.168.199.91:9010
 
-# if resource.storage.type=S3,s3 access key
+# if resource.storage.type=S3,s3 access key
 #fs.s3a.access.key=A3DXS30FO22544RE
 
-# if resource.storage.type=S3,s3 secret key
+# if resource.storage.type=S3,s3 secret key
 #fs.s3a.secret.key=OloCLq3n+8+sdPHUhJ21XrSxTC+JK
 
 # if not use hadoop resourcemanager, please keep default value; if resourcemanager HA enable, please type the HA ips ; if resourcemanager is single, make this value empty
@@ -59,8 +59,10 @@ yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
 
 # If resourcemanager HA enable or not use resourcemanager, please keep the default value; If resourcemanager is single, you only need to replace ark1 to actual resourcemanager hostname.
 yarn.application.status.address=http://ark1:8088/ws/v1/cluster/apps/%s
+# job history status url when application number threshold is reached(default 10000,maybe it was set to 1000)
+yarn.job.history.status.address=http://ark1:19888/ws/v1/history/mapreduce/jobs/%s
 
 # system env path
 #dolphinscheduler.env.path=env/dolphinscheduler_env.sh
 development.state=false
-kerberos.expire.time=7
\ No newline at end of file
+kerberos.expire.time=7
diff --git a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java
index e239fe7..440f863 100644
--- a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java
+++ b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/HadoopUtilsTest.java
@@ -191,6 +191,12 @@ public class HadoopUtilsTest {
     }
 
     @Test
+    public void getJobHistoryUrl(){
+        String application_1516778421218_0042 = hadoopUtils.getJobHistoryUrl("application_1529051418016_0167");
+        logger.info(application_1516778421218_0042);
+    }
+
+    @Test
     public void catFileWithLimitTest() {
         List<String> stringList = new ArrayList<>();
         try {