You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/21 03:43:26 UTC

[19/30] kylin git commit: minor, job diag support different hadoop env

minor, job diag support different hadoop env


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6c800d6e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6c800d6e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6c800d6e

Branch: refs/heads/master-hbase0.98
Commit: 6c800d6e825c36ebe677b19efd7f02a7e3b9509b
Parents: 2ab9bf2
Author: lidongsjtu <li...@apache.org>
Authored: Thu Mar 16 16:48:02 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Thu Mar 16 17:22:31 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/tool/MrJobInfoExtractor.java   | 51 ++++++++++++++------
 1 file changed, 35 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/6c800d6e/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
index b9bf2de..ca4c7e1 100644
--- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
@@ -21,6 +21,7 @@ package org.apache.kylin.tool;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 public class MrJobInfoExtractor extends AbstractInfoExtractor {
@@ -59,6 +61,11 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
 
     private static final int HTTP_RETRY = 3;
 
+    private Map<String, String> nodeInfoMap = Maps.newHashMap();
+
+    private String jobHistoryUrlBase;
+    private String yarnMasterUrlBase;
+
     public MrJobInfoExtractor() {
         packageType = "MR";
 
@@ -71,14 +78,15 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
         extractor.execute(args);
     }
 
-    private String getRestCheckUrl() {
+    private void extractRestCheckUrl() {
         KylinConfig config = KylinConfig.getInstanceFromEnv();
         final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
-        Pattern pattern = Pattern.compile("(http://)(.*):.*");
+        Pattern pattern = Pattern.compile("(http://)([^:]*):([^/])*.*");
         if (yarnStatusCheckUrl != null) {
             Matcher m = pattern.matcher(yarnStatusCheckUrl);
             if (m.matches()) {
-                return m.group(1) + m.group(2) + ":19888";
+                jobHistoryUrlBase = m.group(1) + m.group(2) + ":19888";
+                yarnMasterUrlBase = m.group(1) + m.group(2) + ":" + m.group(3);
             }
         }
         logger.info("kylin.engine.mr.yarn-check-status-url" + " is not set, read from hadoop configuration");
@@ -91,14 +99,15 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
             rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, yarnConf);
         }
         if (StringUtils.isEmpty(rmWebHost)) {
-            return null;
+            return;
         }
         if (!rmWebHost.startsWith("http://") && !rmWebHost.startsWith("https://")) {
             rmWebHost = "http://" + rmWebHost;
         }
         Matcher m = pattern.matcher(rmWebHost);
         Preconditions.checkArgument(m.matches(), "Yarn master URL not found.");
-        return m.group(1) + m.group(2) + ":19888";
+        yarnMasterUrlBase = rmWebHost;
+        jobHistoryUrlBase = m.group(1) + HAUtil.getConfValueForRMInstance("mapreduce.jobhistory.webapp.address", m.group(2) + ":19888", conf);
     }
 
     private String getHttpResponse(String url) {
@@ -122,7 +131,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
         return msg;
     }
 
-    private void extractTaskDetail(String taskId, String nodeId, String user, File exportDir, String taskUrl, String urlBase) throws IOException {
+    private void extractTaskDetail(String taskId, String user, File exportDir, String taskUrl, String urlBase) throws IOException {
         try {
             if (StringUtils.isEmpty(taskId)) {
                 return;
@@ -137,8 +146,9 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
             String succAttemptId = taskAttempt.textValue();
 
             String attemptInfo = saveHttpResponseQuietly(new File(destDir, "task_attempts.json"), taskUrlBase + "/attempts/" + succAttemptId);
-            JsonNode attemptAttempt = new ObjectMapper().readTree(attemptInfo).path("taskAttempt").path("assignedContainerId");
-            String containerId = attemptAttempt.textValue();
+            JsonNode attemptAttempt = new ObjectMapper().readTree(attemptInfo).path("taskAttempt");
+            String containerId = attemptAttempt.get("assignedContainerId").textValue();
+            String nodeId = nodeInfoMap.get(attemptAttempt.get("nodeHttpAddress").textValue());
 
             // save task counters
             saveHttpResponseQuietly(new File(destDir, "task_counters.json"), taskUrlBase + "/counters");
@@ -173,16 +183,25 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
         try {
             boolean includeTaskDetails = optionsHelper.hasOption(OPTION_INCLUDE_DETAILS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_DETAILS)) : true;
             String mrJobId = optionsHelper.getOptionValue(OPTION_MR_JOB_ID);
-            String jobUrlBase = getRestCheckUrl();
-            String jobUrlPrefix = jobUrlBase + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+            extractRestCheckUrl();
+
+            Preconditions.checkNotNull(jobHistoryUrlBase);
+            Preconditions.checkNotNull(yarnMasterUrlBase);
+
+            String jobUrlPrefix = jobHistoryUrlBase + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+
+            // cache node info
+            String nodeUrl = yarnMasterUrlBase + "/ws/v1/cluster/nodes";
+            String nodeResponse = getHttpResponse(nodeUrl);
+            JsonNode nodes = new ObjectMapper().readTree(nodeResponse).path("nodes").path("node");
+            for (JsonNode node : nodes) {
+                nodeInfoMap.put(node.path("nodeHTTPAddress").textValue(), node.path("id").textValue());
+            }
 
             // save mr job stats
             String jobResponse = saveHttpResponseQuietly(new File(exportDir, "job.json"), jobUrlPrefix);
             String user = new ObjectMapper().readTree(jobResponse).path("job").path("user").textValue();
 
-            String jobAttemptResponse = saveHttpResponseQuietly(new File(exportDir, "job_attempts.json"), jobUrlPrefix + "/jobattempts");
-            String nodeId = new ObjectMapper().readTree(jobAttemptResponse).path("jobAttempts").path("jobAttempt").get(0).path("nodeId").textValue();
-
             // save mr job conf
             saveHttpResponseQuietly(new File(exportDir, "job_conf.json"), jobUrlPrefix + "/conf");
 
@@ -191,7 +210,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
 
             // save task details
             if (includeTaskDetails) {
-                extractTaskDetails(exportDir, jobUrlPrefix, jobUrlBase, nodeId, user);
+                extractTaskDetails(exportDir, jobUrlPrefix, jobHistoryUrlBase, user);
             }
 
         } catch (Exception e) {
@@ -199,7 +218,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
         }
     }
 
-    private void extractTaskDetails(File exportDir, String jobUrlPrefix, String jobUrlBase, String nodeId, String user) {
+    private void extractTaskDetails(File exportDir, String jobUrlPrefix, String jobUrlBase, String user) {
         try {
             String tasksUrl = jobUrlPrefix + "/tasks/";
             String tasksResponse = saveHttpResponseQuietly(new File(exportDir, "job_tasks.json"), tasksUrl);
@@ -324,7 +343,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
             File tasksDir = new File(exportDir, "tasks");
             FileUtils.forceMkdir(tasksDir);
             for (String taskId : selectedTaskIds) {
-                extractTaskDetail(taskId, nodeId, user, tasksDir, tasksUrl, jobUrlBase);
+                extractTaskDetail(taskId, user, tasksDir, tasksUrl, jobUrlBase);
             }
         } catch (Exception e) {
             logger.warn("Failed to get mr tasks rest response.", e);