You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/21 03:43:15 UTC

[08/30] kylin git commit: minor, add MR job counters in job diagnosis

minor, add MR job counters in job diagnosis


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d4768c12
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d4768c12
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d4768c12

Branch: refs/heads/master-hbase0.98
Commit: d4768c12e423e96679e4d7db73c697f0378c0a94
Parents: e7510c2
Author: lidongsjtu <li...@apache.org>
Authored: Tue Mar 14 23:03:35 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Tue Mar 14 23:05:24 2017 +0800

----------------------------------------------------------------------
 build/bin/diag.sh                               |   2 +-
 build/conf/kylin-tools-log4j.properties         |   1 +
 .../apache/kylin/tool/JobDiagnosisInfoCLI.java  |   6 +-
 .../apache/kylin/tool/MrJobInfoExtractor.java   | 107 ++++++++++++++-----
 4 files changed, 86 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/build/bin/diag.sh
----------------------------------------------------------------------
diff --git a/build/bin/diag.sh b/build/bin/diag.sh
index e9354a2..a995774 100644
--- a/build/bin/diag.sh
+++ b/build/bin/diag.sh
@@ -52,7 +52,7 @@ then
 
     if [ ${#patient} -eq 36 ]; then
         hbase ${KYLIN_EXTRA_START_OPTS} \
-        -Dlog4j.configuration=kylin-server-log4j.properties \
+        -Dlog4j.configuration=kylin-tool-log4j.properties \
         -Dcatalina.home=${tomcat_root} \
         org.apache.kylin.tool.JobDiagnosisInfoCLI \
         -jobId $patient \

http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/build/conf/kylin-tools-log4j.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin-tools-log4j.properties b/build/conf/kylin-tools-log4j.properties
index e975d18..2ccd772 100644
--- a/build/conf/kylin-tools-log4j.properties
+++ b/build/conf/kylin-tools-log4j.properties
@@ -35,3 +35,4 @@ log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t %c{1}:%L]: %
 #log4j.logger.org.apache.hadoop=ERROR
 log4j.logger.org.apache.kylin=DEBUG
 log4j.logger.org.springframework=WARN
+log4j.logger.org.apache.commons.httpclient=WARN

http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
index 638d97b..04dbef7 100644
--- a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
@@ -192,10 +192,10 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor {
     private void extractJobInfo(String taskId, File destDir) throws Exception {
         final Map<String, String> jobInfo = executableDao.getJobOutput(taskId).getInfo();
         if (jobInfo.containsKey(ExecutableConstants.MR_JOB_ID)) {
-            String jobId = jobInfo.get(ExecutableConstants.MR_JOB_ID);
+            String mrJobId = jobInfo.get(ExecutableConstants.MR_JOB_ID);
             FileUtils.forceMkdir(destDir);
-            String[] mrJobArgs = { "-destDir", destDir.getAbsolutePath(), "-compress", "false", "-submodule", "true" };
-            new MrJobInfoExtractor(jobId).execute(mrJobArgs);
+            String[] mrJobArgs = { "-mrJobId", mrJobId, "-destDir", destDir.getAbsolutePath(), "-compress", "false", "-submodule", "true" };
+            new MrJobInfoExtractor().execute(mrJobArgs);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
index ea19885..55b54a5 100644
--- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
@@ -44,22 +44,29 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 
 public class MrJobInfoExtractor extends AbstractInfoExtractor {
-    private String mrJobId;
-    private String jobUrlPrefix;
-
     private static final Logger logger = LoggerFactory.getLogger(MrJobInfoExtractor.class);
 
     @SuppressWarnings("static-access")
-    private static final Option OPTION_INCLUDE_COUNTERS = OptionBuilder.withArgName("includeCounters").hasArg().isRequired(false).withDescription("Specify whether to include mr task counters to extract. Default false.").create("includeCounters");
+    private static final Option OPTION_INCLUDE_COUNTERS = OptionBuilder.withArgName("includeCounters").hasArg().isRequired(false).withDescription("Specify whether to include mr task counters to extract. Default true.").create("includeCounters");
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_MR_JOB_ID = OptionBuilder.withArgName("mrJobId").hasArg().isRequired(false).withDescription("Specify MR Job Id").create("mrJobId");
 
-    private final int HTTP_RETRY = 3;
+    private static final int HTTP_RETRY = 3;
+
+    public MrJobInfoExtractor() {
+        packageType = "MR";
+
+        options.addOption(OPTION_INCLUDE_COUNTERS);
+        options.addOption(OPTION_MR_JOB_ID);
+    }
 
-    public MrJobInfoExtractor(String mrJobId) {
-        this.mrJobId = mrJobId;
-        String historyServerUrl = getRestCheckUrl();
-        this.jobUrlPrefix = historyServerUrl + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+    public static void main(String[] args) {
+        MrJobInfoExtractor extractor = new MrJobInfoExtractor();
+        extractor.execute(args);
     }
 
     private String getRestCheckUrl() {
@@ -84,13 +91,11 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
         if (StringUtils.isEmpty(rmWebHost)) {
             return null;
         }
-        if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
-            //do nothing
-        } else {
+        if (!rmWebHost.startsWith("http://") && !rmWebHost.startsWith("https://")) {
             rmWebHost = "http://" + rmWebHost;
         }
         Matcher m = pattern.matcher(rmWebHost);
-        m.matches();
+        Preconditions.checkArgument(m.matches(), "Yarn master URL not found.");
         return m.group(1) + m.group(2) + ":19888";
     }
 
@@ -115,19 +120,17 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
         return msg;
     }
 
-    private void extractTaskCounter(String taskId, File exportDir, String taskUrl) throws IOException {
+    private void extractTaskCounter(String taskId, File exportDir, String taskUrl, String id) throws IOException {
         try {
             String response = getHttpResponse(taskUrl + taskId + "/counters");
-            FileUtils.writeStringToFile(new File(exportDir, taskId + ".json"), response, Charset.defaultCharset());
+            FileUtils.writeStringToFile(new File(exportDir, id + "_" + taskId + ".json"), response, Charset.defaultCharset());
         } catch (Exception e) {
             logger.warn("Failed to get task counters rest response" + e);
         }
     }
 
-    private void extractJobConf(File exportDir) throws IOException {
+    private void extractJobConf(File exportDir, String jobUrlPrefix) throws IOException {
         try {
-            String jobResponse = getHttpResponse(jobUrlPrefix);
-            JsonNode job = new ObjectMapper().readTree(jobResponse).path("job").get("state");
             String confUrl = jobUrlPrefix + "/conf/";
             String response = getHttpResponse(confUrl);
             FileUtils.writeStringToFile(new File(exportDir, "job_conf.json"), response, Charset.defaultCharset());
@@ -139,47 +142,99 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
     @Override
     protected void executeExtract(OptionsHelper optionsHelper, File exportDir) throws Exception {
         try {
-            boolean includeTaskCounter = optionsHelper.hasOption(OPTION_INCLUDE_COUNTERS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_COUNTERS)) : false;
+            boolean includeTaskCounter = optionsHelper.hasOption(OPTION_INCLUDE_COUNTERS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_COUNTERS)) : true;
+            String mrJobId = optionsHelper.getOptionValue(OPTION_MR_JOB_ID);
+            String jobUrlPrefix = getRestCheckUrl() + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+
             if (includeTaskCounter) {
-                extractTaskCounters(exportDir);
+                extractTaskCounters(exportDir, jobUrlPrefix);
             }
-            extractJobConf(exportDir);
+            extractJobCounters(exportDir, jobUrlPrefix);
+            extractJobConf(exportDir, jobUrlPrefix);
         } catch (Exception e) {
             logger.warn("Failed to get mr tasks rest response.", e);
         }
     }
 
-    private void extractTaskCounters(File exportDir) {
+    private void extractJobCounters(File exportDir, String jobUrlPrefix) {
+        String url = jobUrlPrefix + "/counters";
+        String response = getHttpResponse(url);
+        try {
+            File counterDir = new File(exportDir, "counters");
+            FileUtils.forceMkdir(counterDir);
+            FileUtils.writeStringToFile(new File(exportDir, "job_counters.json"), response, Charset.defaultCharset());
+        } catch (Exception e) {
+            logger.warn("Failed to get mr counters rest response.", e);
+        }
+    }
+
+    private void extractTaskCounters(File exportDir, String jobUrlPrefix) {
         try {
             String tasksUrl = jobUrlPrefix + "/tasks/";
             String tasksResponse = getHttpResponse(tasksUrl);
             JsonNode tasks = new ObjectMapper().readTree(tasksResponse).path("tasks").path("task");
 
+            // find the max map and reduce duation
             String maxReduceId = null;
             String maxMapId = null;
             long maxMapElapsedTime = 0L;
             long maxReduceElapsedTime = 0L;
 
+            // find the min map and reduce duration
+            String minReduceId = null;
+            String minMapId = null;
+            long minMapElapsedTime = Integer.MAX_VALUE;
+            long minReduceElapsedTime = Integer.MAX_VALUE;
+
+            // find a normal map and reduce duration (the first one)
+            String normReduceId = null;
+            String normMapId = null;
+            long normMapElapsedTime = 0;
+            long normReduceElapsedTime = 0;
             for (JsonNode node : tasks) {
                 if (node.get("type").textValue().equals("MAP")) {
                     if (node.get("elapsedTime").longValue() >= maxMapElapsedTime) {
                         maxMapElapsedTime = node.get("elapsedTime").longValue();
                         maxMapId = node.get("id").textValue();
                     }
+
+                    if (node.get("elapsedTime").longValue() <= minMapElapsedTime) {
+                        minMapElapsedTime = node.get("elapsedTime").longValue();
+                        minMapId = node.get("id").textValue();
+                    }
+
+                    if (normMapElapsedTime == 0) {
+                        normMapElapsedTime = node.get("elapsedTime").longValue();
+                        normMapId = node.get("id").textValue();
+                    }
                 }
                 if (node.get("type").textValue().equals("REDUCE")) {
                     if (node.get("elapsedTime").longValue() >= maxReduceElapsedTime) {
                         maxReduceElapsedTime = node.get("elapsedTime").longValue();
                         maxReduceId = node.get("id").textValue();
                     }
+
+                    if (node.get("elapsedTime").longValue() <= minReduceElapsedTime) {
+                        minReduceElapsedTime = node.get("elapsedTime").longValue();
+                        minReduceId = node.get("id").textValue();
+                    }
+
+                    if (normReduceElapsedTime == 0) {
+                        normReduceElapsedTime = node.get("elapsedTime").longValue();
+                        normReduceId = node.get("id").textValue();
+                    }
                 }
             }
             File counterDir = new File(exportDir, "counters");
             FileUtils.forceMkdir(counterDir);
-            extractTaskCounter(maxMapId, counterDir, tasksUrl);
-            extractTaskCounter(maxReduceId, counterDir, tasksUrl);
+            extractTaskCounter(maxMapId, counterDir, tasksUrl, "max");
+            extractTaskCounter(maxReduceId, counterDir, tasksUrl, "max");
+            extractTaskCounter(minMapId, counterDir, tasksUrl, "min");
+            extractTaskCounter(minReduceId, counterDir, tasksUrl, "min");
+            extractTaskCounter(normMapId, counterDir, tasksUrl, "norm");
+            extractTaskCounter(normReduceId, counterDir, tasksUrl, "norm");
         } catch (Exception e) {
-            logger.warn("Failed to get mr tasks rest response" + e);
+            logger.warn("Failed to get mr tasks rest response.", e);
         }
     }
-}
+}
\ No newline at end of file