You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/03/21 03:43:15 UTC
[08/30] kylin git commit: minor, add MR job counters in job diagnosis
minor, add MR job counters in job diagnosis
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d4768c12
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d4768c12
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d4768c12
Branch: refs/heads/master-hbase0.98
Commit: d4768c12e423e96679e4d7db73c697f0378c0a94
Parents: e7510c2
Author: lidongsjtu <li...@apache.org>
Authored: Tue Mar 14 23:03:35 2017 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Tue Mar 14 23:05:24 2017 +0800
----------------------------------------------------------------------
build/bin/diag.sh | 2 +-
build/conf/kylin-tools-log4j.properties | 1 +
.../apache/kylin/tool/JobDiagnosisInfoCLI.java | 6 +-
.../apache/kylin/tool/MrJobInfoExtractor.java | 107 ++++++++++++++-----
4 files changed, 86 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/build/bin/diag.sh
----------------------------------------------------------------------
diff --git a/build/bin/diag.sh b/build/bin/diag.sh
index e9354a2..a995774 100644
--- a/build/bin/diag.sh
+++ b/build/bin/diag.sh
@@ -52,7 +52,7 @@ then
if [ ${#patient} -eq 36 ]; then
hbase ${KYLIN_EXTRA_START_OPTS} \
- -Dlog4j.configuration=kylin-server-log4j.properties \
+ -Dlog4j.configuration=kylin-tool-log4j.properties \
-Dcatalina.home=${tomcat_root} \
org.apache.kylin.tool.JobDiagnosisInfoCLI \
-jobId $patient \
http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/build/conf/kylin-tools-log4j.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin-tools-log4j.properties b/build/conf/kylin-tools-log4j.properties
index e975d18..2ccd772 100644
--- a/build/conf/kylin-tools-log4j.properties
+++ b/build/conf/kylin-tools-log4j.properties
@@ -35,3 +35,4 @@ log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t %c{1}:%L]: %
#log4j.logger.org.apache.hadoop=ERROR
log4j.logger.org.apache.kylin=DEBUG
log4j.logger.org.springframework=WARN
+log4j.logger.org.apache.commons.httpclient=WARN
http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
index 638d97b..04dbef7 100644
--- a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
@@ -192,10 +192,10 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor {
private void extractJobInfo(String taskId, File destDir) throws Exception {
final Map<String, String> jobInfo = executableDao.getJobOutput(taskId).getInfo();
if (jobInfo.containsKey(ExecutableConstants.MR_JOB_ID)) {
- String jobId = jobInfo.get(ExecutableConstants.MR_JOB_ID);
+ String mrJobId = jobInfo.get(ExecutableConstants.MR_JOB_ID);
FileUtils.forceMkdir(destDir);
- String[] mrJobArgs = { "-destDir", destDir.getAbsolutePath(), "-compress", "false", "-submodule", "true" };
- new MrJobInfoExtractor(jobId).execute(mrJobArgs);
+ String[] mrJobArgs = { "-mrJobId", mrJobId, "-destDir", destDir.getAbsolutePath(), "-compress", "false", "-submodule", "true" };
+ new MrJobInfoExtractor().execute(mrJobArgs);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d4768c12/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
index ea19885..55b54a5 100644
--- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
@@ -44,22 +44,29 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
public class MrJobInfoExtractor extends AbstractInfoExtractor {
- private String mrJobId;
- private String jobUrlPrefix;
-
private static final Logger logger = LoggerFactory.getLogger(MrJobInfoExtractor.class);
@SuppressWarnings("static-access")
- private static final Option OPTION_INCLUDE_COUNTERS = OptionBuilder.withArgName("includeCounters").hasArg().isRequired(false).withDescription("Specify whether to include mr task counters to extract. Default false.").create("includeCounters");
+ private static final Option OPTION_INCLUDE_COUNTERS = OptionBuilder.withArgName("includeCounters").hasArg().isRequired(false).withDescription("Specify whether to include mr task counters to extract. Default true.").create("includeCounters");
+
+ @SuppressWarnings("static-access")
+ private static final Option OPTION_MR_JOB_ID = OptionBuilder.withArgName("mrJobId").hasArg().isRequired(false).withDescription("Specify MR Job Id").create("mrJobId");
- private final int HTTP_RETRY = 3;
+ private static final int HTTP_RETRY = 3;
+
+ public MrJobInfoExtractor() {
+ packageType = "MR";
+
+ options.addOption(OPTION_INCLUDE_COUNTERS);
+ options.addOption(OPTION_MR_JOB_ID);
+ }
- public MrJobInfoExtractor(String mrJobId) {
- this.mrJobId = mrJobId;
- String historyServerUrl = getRestCheckUrl();
- this.jobUrlPrefix = historyServerUrl + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+ public static void main(String[] args) {
+ MrJobInfoExtractor extractor = new MrJobInfoExtractor();
+ extractor.execute(args);
}
private String getRestCheckUrl() {
@@ -84,13 +91,11 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
if (StringUtils.isEmpty(rmWebHost)) {
return null;
}
- if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
- //do nothing
- } else {
+ if (!rmWebHost.startsWith("http://") && !rmWebHost.startsWith("https://")) {
rmWebHost = "http://" + rmWebHost;
}
Matcher m = pattern.matcher(rmWebHost);
- m.matches();
+ Preconditions.checkArgument(m.matches(), "Yarn master URL not found.");
return m.group(1) + m.group(2) + ":19888";
}
@@ -115,19 +120,17 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
return msg;
}
- private void extractTaskCounter(String taskId, File exportDir, String taskUrl) throws IOException {
+ private void extractTaskCounter(String taskId, File exportDir, String taskUrl, String id) throws IOException {
try {
String response = getHttpResponse(taskUrl + taskId + "/counters");
- FileUtils.writeStringToFile(new File(exportDir, taskId + ".json"), response, Charset.defaultCharset());
+ FileUtils.writeStringToFile(new File(exportDir, id + "_" + taskId + ".json"), response, Charset.defaultCharset());
} catch (Exception e) {
logger.warn("Failed to get task counters rest response" + e);
}
}
- private void extractJobConf(File exportDir) throws IOException {
+ private void extractJobConf(File exportDir, String jobUrlPrefix) throws IOException {
try {
- String jobResponse = getHttpResponse(jobUrlPrefix);
- JsonNode job = new ObjectMapper().readTree(jobResponse).path("job").get("state");
String confUrl = jobUrlPrefix + "/conf/";
String response = getHttpResponse(confUrl);
FileUtils.writeStringToFile(new File(exportDir, "job_conf.json"), response, Charset.defaultCharset());
@@ -139,47 +142,99 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor {
@Override
protected void executeExtract(OptionsHelper optionsHelper, File exportDir) throws Exception {
try {
- boolean includeTaskCounter = optionsHelper.hasOption(OPTION_INCLUDE_COUNTERS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_COUNTERS)) : false;
+ boolean includeTaskCounter = optionsHelper.hasOption(OPTION_INCLUDE_COUNTERS) ? Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_COUNTERS)) : true;
+ String mrJobId = optionsHelper.getOptionValue(OPTION_MR_JOB_ID);
+ String jobUrlPrefix = getRestCheckUrl() + "/ws/v1/history/mapreduce/jobs/" + mrJobId;
+
if (includeTaskCounter) {
- extractTaskCounters(exportDir);
+ extractTaskCounters(exportDir, jobUrlPrefix);
}
- extractJobConf(exportDir);
+ extractJobCounters(exportDir, jobUrlPrefix);
+ extractJobConf(exportDir, jobUrlPrefix);
} catch (Exception e) {
logger.warn("Failed to get mr tasks rest response.", e);
}
}
- private void extractTaskCounters(File exportDir) {
+ private void extractJobCounters(File exportDir, String jobUrlPrefix) {
+ String url = jobUrlPrefix + "/counters";
+ String response = getHttpResponse(url);
+ try {
+ File counterDir = new File(exportDir, "counters");
+ FileUtils.forceMkdir(counterDir);
+ FileUtils.writeStringToFile(new File(exportDir, "job_counters.json"), response, Charset.defaultCharset());
+ } catch (Exception e) {
+ logger.warn("Failed to get mr counters rest response.", e);
+ }
+ }
+
+ private void extractTaskCounters(File exportDir, String jobUrlPrefix) {
try {
String tasksUrl = jobUrlPrefix + "/tasks/";
String tasksResponse = getHttpResponse(tasksUrl);
JsonNode tasks = new ObjectMapper().readTree(tasksResponse).path("tasks").path("task");
+ // find the max map and reduce duation
String maxReduceId = null;
String maxMapId = null;
long maxMapElapsedTime = 0L;
long maxReduceElapsedTime = 0L;
+ // find the min map and reduce duration
+ String minReduceId = null;
+ String minMapId = null;
+ long minMapElapsedTime = Integer.MAX_VALUE;
+ long minReduceElapsedTime = Integer.MAX_VALUE;
+
+ // find a normal map and reduce duration (the first one)
+ String normReduceId = null;
+ String normMapId = null;
+ long normMapElapsedTime = 0;
+ long normReduceElapsedTime = 0;
for (JsonNode node : tasks) {
if (node.get("type").textValue().equals("MAP")) {
if (node.get("elapsedTime").longValue() >= maxMapElapsedTime) {
maxMapElapsedTime = node.get("elapsedTime").longValue();
maxMapId = node.get("id").textValue();
}
+
+ if (node.get("elapsedTime").longValue() <= minMapElapsedTime) {
+ minMapElapsedTime = node.get("elapsedTime").longValue();
+ minMapId = node.get("id").textValue();
+ }
+
+ if (normMapElapsedTime == 0) {
+ normMapElapsedTime = node.get("elapsedTime").longValue();
+ normMapId = node.get("id").textValue();
+ }
}
if (node.get("type").textValue().equals("REDUCE")) {
if (node.get("elapsedTime").longValue() >= maxReduceElapsedTime) {
maxReduceElapsedTime = node.get("elapsedTime").longValue();
maxReduceId = node.get("id").textValue();
}
+
+ if (node.get("elapsedTime").longValue() <= minReduceElapsedTime) {
+ minReduceElapsedTime = node.get("elapsedTime").longValue();
+ minReduceId = node.get("id").textValue();
+ }
+
+ if (normReduceElapsedTime == 0) {
+ normReduceElapsedTime = node.get("elapsedTime").longValue();
+ normReduceId = node.get("id").textValue();
+ }
}
}
File counterDir = new File(exportDir, "counters");
FileUtils.forceMkdir(counterDir);
- extractTaskCounter(maxMapId, counterDir, tasksUrl);
- extractTaskCounter(maxReduceId, counterDir, tasksUrl);
+ extractTaskCounter(maxMapId, counterDir, tasksUrl, "max");
+ extractTaskCounter(maxReduceId, counterDir, tasksUrl, "max");
+ extractTaskCounter(minMapId, counterDir, tasksUrl, "min");
+ extractTaskCounter(minReduceId, counterDir, tasksUrl, "min");
+ extractTaskCounter(normMapId, counterDir, tasksUrl, "norm");
+ extractTaskCounter(normReduceId, counterDir, tasksUrl, "norm");
} catch (Exception e) {
- logger.warn("Failed to get mr tasks rest response" + e);
+ logger.warn("Failed to get mr tasks rest response.", e);
}
}
-}
+}
\ No newline at end of file