You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/03 11:28:35 UTC

[21/50] incubator-kylin git commit: add user defined config for kylin.job.yarn.app.rest.check.status.url

add user defined config for kylin.job.yarn.app.rest.check.status.url


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2e7a7109
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2e7a7109
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2e7a7109

Branch: refs/heads/master
Commit: 2e7a7109a6e5d79748eafbccf13de1c92b35853f
Parents: eb4ea7d
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Feb 13 18:19:17 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Feb 13 18:19:17 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    |  8 +++--
 .../kylin/job/common/MapReduceExecutable.java   | 36 ++++++++++++++------
 2 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e7a7109/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 1a94bff..5656fd3 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -59,6 +59,8 @@ public class KylinConfig {
 
     public static final String KYLIN_JOB_CONCURRENT_MAX_LIMIT = "kylin.job.concurrent.max.limit";
 
+    public static final String KYLIN_JOB_YARN_APP_REST_CHECK_URL = "kylin.job.yarn.app.rest.check.status.url";
+
     public static final String KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS = "kylin.job.yarn.app.rest.check.interval.seconds";
 
     public static final String KYLIN_TMP_HDFS_DIR = "kylin.tmp.hdfs.dir";
@@ -389,9 +391,9 @@ public class KylinConfig {
         return getOptional(KYLIN_TMP_HDFS_DIR, "/tmp/kylin");
     }
 
-    //    public String getYarnStatusServiceUrl() {
-    //        return getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_STATUS_URL, null);
-    //    }
+    public String getYarnStatusCheckUrl() {
+        return getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_URL, null);
+    }
 
     public int getYarnStatusCheckIntervalSeconds() {
         return Integer.parseInt(getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS, "60"));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e7a7109/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
index 20791a0..1ea1620 100644
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
@@ -20,6 +20,7 @@ package org.apache.kylin.job.common;
 
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.exception.ExecuteException;
@@ -117,17 +118,12 @@ public class MapReduceExecutable extends AbstractExecutable {
             }
             final StringBuilder output = new StringBuilder();
             final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output);
-            String rmWebHost = job.getConfiguration().get("yarn.resourcemanager.webapp.address");
-            if (StringUtils.isEmpty(rmWebHost)) {
-                return new ExecuteResult(ExecuteResult.State.ERROR, "yarn.resourcemanager.webapp.address is empty");
-            }
-            if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
-                //do nothing
-            } else {
-                rmWebHost = "http://" + rmWebHost;
+
+            final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig());
+            if (restStatusCheckUrl == null) {
+                logger.error("restStatusCheckUrl is null");
+                return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null");
             }
-            logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
-            final String restStatusCheckUrl = rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
             String mrJobId = hadoopCmdOutput.getMrJobId();
             HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output);
             JobStepStatusEnum status = JobStepStatusEnum.NEW;
@@ -168,6 +164,26 @@ public class MapReduceExecutable extends AbstractExecutable {
         }
     }
 
+    private String getRestStatusCheckUrl(Job job, KylinConfig config) {
+        final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
+        if (yarnStatusCheckUrl != null) {
+            return yarnStatusCheckUrl;
+        } else {
+            logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read from job configuration");
+        }
+        String rmWebHost = job.getConfiguration().get("yarn.resourcemanager.webapp.address");
+        if (StringUtils.isEmpty(rmWebHost)) {
+            return null;
+        }
+        if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
+            //do nothing
+        } else {
+            rmWebHost = "http://" + rmWebHost;
+        }
+        logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
+        return rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
+    }
+
     public long getMapReduceWaitTime() {
         return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
     }