You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/27 07:25:41 UTC
[28/41] incubator-kylin git commit: add user defined config for
kylin.job.yarn.app.rest.check.status.url
add user defined config for kylin.job.yarn.app.rest.check.status.url
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/2e7a7109
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/2e7a7109
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/2e7a7109
Branch: refs/heads/inverted-index
Commit: 2e7a7109a6e5d79748eafbccf13de1c92b35853f
Parents: eb4ea7d
Author: qianhao.zhou <qi...@ebay.com>
Authored: Fri Feb 13 18:19:17 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Fri Feb 13 18:19:17 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/KylinConfig.java | 8 +++--
.../kylin/job/common/MapReduceExecutable.java | 36 ++++++++++++++------
2 files changed, 31 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e7a7109/common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/KylinConfig.java b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 1a94bff..5656fd3 100644
--- a/common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -59,6 +59,8 @@ public class KylinConfig {
public static final String KYLIN_JOB_CONCURRENT_MAX_LIMIT = "kylin.job.concurrent.max.limit";
+ public static final String KYLIN_JOB_YARN_APP_REST_CHECK_URL = "kylin.job.yarn.app.rest.check.status.url";
+
public static final String KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS = "kylin.job.yarn.app.rest.check.interval.seconds";
public static final String KYLIN_TMP_HDFS_DIR = "kylin.tmp.hdfs.dir";
@@ -389,9 +391,9 @@ public class KylinConfig {
return getOptional(KYLIN_TMP_HDFS_DIR, "/tmp/kylin");
}
- // public String getYarnStatusServiceUrl() {
- // return getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_STATUS_URL, null);
- // }
+ public String getYarnStatusCheckUrl() {
+ return getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_URL, null);
+ }
public int getYarnStatusCheckIntervalSeconds() {
return Integer.parseInt(getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS, "60"));
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2e7a7109/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
index 20791a0..1ea1620 100644
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
@@ -20,6 +20,7 @@ package org.apache.kylin.job.common;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.constant.JobStepStatusEnum;
import org.apache.kylin.job.exception.ExecuteException;
@@ -117,17 +118,12 @@ public class MapReduceExecutable extends AbstractExecutable {
}
final StringBuilder output = new StringBuilder();
final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output);
- String rmWebHost = job.getConfiguration().get("yarn.resourcemanager.webapp.address");
- if (StringUtils.isEmpty(rmWebHost)) {
- return new ExecuteResult(ExecuteResult.State.ERROR, "yarn.resourcemanager.webapp.address is empty");
- }
- if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
- //do nothing
- } else {
- rmWebHost = "http://" + rmWebHost;
+
+ final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig());
+ if (restStatusCheckUrl == null) {
+ logger.error("restStatusCheckUrl is null");
+ return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null");
}
- logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
- final String restStatusCheckUrl = rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
String mrJobId = hadoopCmdOutput.getMrJobId();
HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output);
JobStepStatusEnum status = JobStepStatusEnum.NEW;
@@ -168,6 +164,26 @@ public class MapReduceExecutable extends AbstractExecutable {
}
}
+ private String getRestStatusCheckUrl(Job job, KylinConfig config) {
+ final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
+ if (yarnStatusCheckUrl != null) {
+ return yarnStatusCheckUrl;
+ } else {
+ logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read from job configuration");
+ }
+ String rmWebHost = job.getConfiguration().get("yarn.resourcemanager.webapp.address");
+ if (StringUtils.isEmpty(rmWebHost)) {
+ return null;
+ }
+ if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
+ //do nothing
+ } else {
+ rmWebHost = "http://" + rmWebHost;
+ }
+ logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
+ return rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
+ }
+
public long getMapReduceWaitTime() {
return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
}