You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/31 06:59:56 UTC
[06/21] kylin git commit: minor, job diag support different hadoop env
minor, job diag support different hadoop env
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6c800d6e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6c800d6e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6c800d6e
Branch: refs/heads/KYLIN-2501
Commit: 6c800d6e825c36ebe677b19efd7f02a7e3b9509b
Parents: 2ab9bf2
Author: lidongsjtu <li...@apache.org>
Authored: Thu Mar 16 16:48:02 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Mar 16 17:22:31 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/tool/MrJobInfoExtractor.java | 51 ++++++++++++++------
1 file changed, 35 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/6c800d6e/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
index b9bf2de..ca4c7e1 100644
--- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
@@ -21,6 +21,7 @@ package org.apache.kylin.tool;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
+import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
public class MrJobInfoExtractor extends AbstractInfoExtractor {
@@ -59,6 +61,11 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
private static final int HTTP_RETRY = 3;
+ private Map<String, String> nodeInfoMap = Maps.newHashMap();
+
+ private String jobHistoryUrlBase;
+ private String yarnMasterUrlBase;
+
public MrJobInfoExtractor() {
packageType = "MR";
@@ -71,14 +78,15 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
extractor.execute(args);
}
- private String getRestCheckUrl() {
+ private void extractRestCheckUrl() {
KylinConfig config = KylinConfig.getInstanceFromEnv();
final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
- Pattern pattern = Pattern.compile("(http://)(.*):.*");
+ Pattern pattern = Pattern.compile("(http://)([^:]*):([^/])*.*");
if (yarnStatusCheckUrl != null) {
Matcher m = pattern.matcher(yarnStatusCheckUrl);
if (m.matches()) {
- return m.group(1) + m.group(2) + ":19888";
+ jobHistoryUrlBase = m.group(1) + m.group(2) + ":19888";
+ yarnMasterUrlBase = m.group(1) + m.group(2) + ":" + m.group(3);
}
}
logger.info("kylin.engine.mr.yarn-check-status-url" + " is not set, read from hadoop configuration");
@@ -91,14 +99,15 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, yarnConf);
}
if (StringUtils.isEmpty(rmWebHost)) {
- return null;
+ return;
}
if (!rmWebHost.startsWith("http://") && !rmWebHost.startsWith("https://")) {
rmWebHost = "http://" + rmWebHost;
}
Matcher m = pattern.matcher(rmWebHost);
Preconditions.checkArgument(m.matches(), "Yarn master URL not found.");
- return m.group(1) + m.group(2) + ":19888";
+ yarnMasterUrlBase = rmWebHost;
+ jobHistoryUrlBase = m.group(1) + HAUtil.getConfValueForRMInstance("mapreduce.jobhistory.webapp.address", m.group(2) + ":19888", conf);
}
private String getHttpResponse(String url) {
@@ -122,7 +131,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
return msg;
}
- private void extractTaskDetail(String taskId, String nodeId, String user, File exportDir, String taskUrl, String urlBase) throws IOException {
+ private void extractTaskDetail(String taskId, String user, File exportDir, String taskUrl, String urlBase) throws IOException {
try {
if (StringUtils.isEmpty(taskId)) {
return;
@@ -137,8 +146,9 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
String succAttemptId = taskAttempt.textValue();
String attemptInfo = saveHttpResponseQuietly(new File(destDir, "task_attempts.json"), taskUrlBase + "/attempts/" + succAttemptId);
- JsonNode attemptAttempt = new ObjectMapper().readTree(attemptInfo).path("taskAttempt").path("assignedContainerId");
- String containerId = attemptAttempt.textValue();
+ JsonNode attemptAttempt = new ObjectMapper().readTree(attemptInfo).path("taskAttempt");
+ String containerId = attemptAttempt.get("assignedContainerId").textValue();
+ String nodeId = nodeInfoMap.get(attemptAttempt.get("nodeHttpAddress").textValue());
// save task counters
saveHttpResponseQuietly(new File(destDir, "task_counters.json"), taskUrlBase + "/counters");
@@ -173,16 +183,25 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
try {
boolean includeTaskDetails = optionsHelper.hasOption(OPTION_INCLUDE_DETAILS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_DETAILS)) : true;
String mrJobId = optionsHelper.getOptionValue(OPTION_MR_JOB_ID);
- String jobUrlBase = getRestCheckUrl();
- String jobUrlPrefix = jobUrlBase + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+ extractRestCheckUrl();
+
+ Preconditions.checkNotNull(jobHistoryUrlBase);
+ Preconditions.checkNotNull(yarnMasterUrlBase);
+
+ String jobUrlPrefix = jobHistoryUrlBase + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+
+ // cache node info
+ String nodeUrl = yarnMasterUrlBase + "/ws/v1/cluster/nodes";
+ String nodeResponse = getHttpResponse(nodeUrl);
+ JsonNode nodes = new ObjectMapper().readTree(nodeResponse).path("nodes").path("node");
+ for (JsonNode node : nodes) {
+ nodeInfoMap.put(node.path("nodeHTTPAddress").textValue(), node.path("id").textValue());
+ }
// save mr job stats
String jobResponse = saveHttpResponseQuietly(new File(exportDir, "job.json"), jobUrlPrefix);
String user = new ObjectMapper().readTree(jobResponse).path("job").path("user").textValue();
- String jobAttemptResponse = saveHttpResponseQuietly(new File(exportDir, "job_attempts.json"), jobUrlPrefix + "/jobattempts");
- String nodeId = new ObjectMapper().readTree(jobAttemptResponse).path("jobAttempts").path("jobAttempt").get(0).path("nodeId").textValue();
-
// save mr job conf
saveHttpResponseQuietly(new File(exportDir, "job_conf.json"), jobUrlPrefix + "/conf");
@@ -191,7 +210,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
// save task details
if (includeTaskDetails) {
- extractTaskDetails(exportDir, jobUrlPrefix, jobUrlBase, nodeId, user);
+ extractTaskDetails(exportDir, jobUrlPrefix, jobHistoryUrlBase, user);
}
} catch (Exception e) {
@@ -199,7 +218,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
}
}
- private void extractTaskDetails(File exportDir, String jobUrlPrefix, String jobUrlBase, String nodeId, String user) {
+ private void extractTaskDetails(File exportDir, String jobUrlPrefix, String jobUrlBase, String user) {
try {
String tasksUrl = jobUrlPrefix + "/tasks/";
String tasksResponse = saveHttpResponseQuietly(new File(exportDir, "job_tasks.json"), tasksUrl);
@@ -324,7 +343,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
File tasksDir = new File(exportDir, "tasks");
FileUtils.forceMkdir(tasksDir);
for (String taskId : selectedTaskIds) {
- extractTaskDetail(taskId, nodeId, user, tasksDir, tasksUrl, jobUrlBase);
+ extractTaskDetail(taskId, user, tasksDir, tasksUrl, jobUrlBase);
}
} catch (Exception e) {
logger.warn("Failed to get mr tasks rest response.", e);